Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 BigQuery Denormalized Destination: Support for more bigquery types through the format annotation #6145

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {
protected static final String PROPERTIES_FIELD = "properties";
protected static final String NESTED_ARRAY_FIELD = "value";
private static final String TYPE_FIELD = "type";
private static final String FORMAT_FIELD = "format";

@Override
protected String getTargetTableName(String streamName) {
Expand Down Expand Up @@ -154,6 +155,16 @@ private static Builder getField(BigQuerySQLNameTransformer namingResolver, Strin
}
}
}

// If a specific format is defined, use their specific type instead of the JSON's one
final JsonNode fieldFormat = fieldDefinition.get(FORMAT_FIELD);
if (fieldFormat != null) {
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText());
if (schemaFormat != null) {
builder.setType(schemaFormat.getBigQueryType());
}
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.StandardSQLTypeName;

/**
* Mapping of JsonSchema formats to BigQuery Standard SQL types.
*/
public enum JsonSchemaFormat {

DATE("date", StandardSQLTypeName.DATE),
DATETIME("date-time", StandardSQLTypeName.DATETIME),
TIME("time", StandardSQLTypeName.TIME);

private final String jsonSchemaFormat;
private final StandardSQLTypeName bigQueryType;

JsonSchemaFormat(String jsonSchemaFormat, StandardSQLTypeName bigQueryType) {
this.jsonSchemaFormat = jsonSchemaFormat;
this.bigQueryType = bigQueryType;
}

public static JsonSchemaFormat fromJsonSchemaFormat(String value) {
for (JsonSchemaFormat type : values()) {
if (value.equals(type.jsonSchemaFormat)) {
return type;
}
}
return null;
}


public String getJsonSchemaFormat() {
return jsonSchemaFormat;
}

public StandardSQLTypeName getBigQueryType() {
return bigQueryType;
}

@Override
public String toString() {
return jsonSchemaFormat;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
Expand All @@ -60,6 +64,7 @@
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -84,6 +89,10 @@ class BigQueryDenormalizedDestinationTest {
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithEmptyObjectAndArray())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS3 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithFormats())
.withEmittedAt(NOW.toEpochMilli()));

private JsonNode config;

Expand Down Expand Up @@ -119,6 +128,7 @@ void setup(TestInfo info) throws IOException {
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
MESSAGE_USERS3.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);
Expand Down Expand Up @@ -186,7 +196,39 @@ void testNestedWrite(JsonNode schema, AirbyteMessage message) throws Exception {
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "grants"), extractJsonValues(expectedUsersJson, "grants"));
assertEquals(extractJsonValues(resultJson, "domain"), extractJsonValues(expectedUsersJson, "domain"));
}

@Test
void testWriteWithFormat() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithFormats()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS3);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode expectedUsersJson = MESSAGE_USERS3.getRecord().getData();
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "date_of_birth"), extractJsonValues(expectedUsersJson, "date_of_birth"));

// Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't expect to receive the value we sent.
assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2018-08-19T12:11:35.220"));

final Schema expectedSchema = Schema.of(
Field.of("name", StandardSQLTypeName.STRING),
Field.of("date_of_birth", StandardSQLTypeName.DATE),
Field.of("updated_at", StandardSQLTypeName.DATETIME),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)
);

assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema);
}

private Set<String> extractJsonValues(JsonNode node, String attributeName) {
Expand Down Expand Up @@ -272,6 +314,34 @@ private static JsonNode getSchema() {

}

private static JsonNode getSchemaWithFormats() {
return Jsons.deserialize(
"{\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"name\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"date_of_birth\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"format\": \"date\"\n"
+ " },\n"
+ " \"updated_at\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"format\": \"date-time\"\n"
+ " }\n"
+ " }\n"
+ "}");
}

private static JsonNode getSchemaWithInvalidArrayType() {
return Jsons.deserialize(
"{\n"
Expand Down Expand Up @@ -330,7 +400,15 @@ private static JsonNode getData() {
+ " }\n"
+ " ]\n"
+ "}");
}

private static JsonNode getDataWithFormats() {
return Jsons.deserialize(
"{\n"
+ " \"name\": \"Andrii\",\n"
+ " \"date_of_birth\": \"1996-01-25\",\n"
+ " \"updated_at\": \"2018-08-19 12:11:35.22\"\n"
+ "}");
}

private static JsonNode getDataWithEmptyObjectAndArray() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,9 @@ public static String getDatasetLocation(JsonNode config) {
}
}

static TableDefinition getTableDefinition(BigQuery bigquery, String datasetName, String tableName) {
final TableId tableId = TableId.of(datasetName, tableName);
return bigquery.getTable(tableId).getDefinition();
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.6 | 2021-09-16 | [#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key
| 0.1.5 | 2021-09-07 | [#5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix
| 0.1.4 | 2021-09-04 | [#5813](https://github.com/airbytehq/airbyte/pull/5813) | fix Stackoverflow error when receive a schema from source where "Array" type doesn't contain a required "items" element |
| 0.1.3 | 2021-08-07 | [#5261](https://github.com/airbytehq/airbyte/pull/5261) | 🐛 Destination BigQuery(Denormalized): Fix processing arrays of records |
Expand Down