Skip to content

Commit

Permalink
🎉Destination-bigquery-denorm: fixed emittedAt timestamp (#11978)
Browse files Browse the repository at this point in the history
* [11067] Destination-bigquery-denorm: fixed emittedAt timestamp
  • Loading branch information
etsybaev authored Apr 15, 2022
1 parent 45acd75 commit 281e0ee
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.3.0
dockerImageTag: 0.3.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.3.0"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.3.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -141,7 +142,9 @@ public JsonNode formatRecord(final AirbyteRecordMessage recordMessage) {
}

protected void addAirbyteColumns(final ObjectNode data, final AirbyteRecordMessage recordMessage) {
final long emittedAtMicroseconds = recordMessage.getEmittedAt();
// currently emittedAt time is in millis format from airbyte message
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(
recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();

data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void testWriteWithFormat() throws Exception {

// Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't
// expect to receive the value we sent.
assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2021-10-11T06:36:53"));
assertEquals(Set.of("2021-10-11T06:36:53"), extractJsonValues(resultJson, "updated_at"));

final Schema expectedSchema = Schema.of(
Field.of("name", StandardSQLTypeName.STRING),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@
import static org.junit.jupiter.params.provider.Arguments.arguments;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.util.TestBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.util.TestGcsBigQueryDenormalizedRecordFormatter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

public class BigQueryDenormalizedUtilsTest {

Expand Down Expand Up @@ -119,7 +125,8 @@ void testSchemaWithDateTime() {
final Field items = fields.get("items");
assertEquals(1, items.getSubFields().size());
assertEquals(LegacySQLTypeName.RECORD, items.getType());
assertEquals(LegacySQLTypeName.TIMESTAMP, items.getSubFields().get("nested_datetime").getType());
assertEquals(LegacySQLTypeName.TIMESTAMP,
items.getSubFields().get("nested_datetime").getType());
}

@Test
Expand Down Expand Up @@ -206,6 +213,22 @@ void testSchemaWithNestedDatetimeInsideNullObject() {

}

@Test
public void testEmittedAtTimeConversion() {
final TestBigQueryDenormalizedRecordFormatter mockedFormatter = Mockito.mock(
TestBigQueryDenormalizedRecordFormatter.class, Mockito.CALLS_REAL_METHODS);

final ObjectMapper mapper = new ObjectMapper();
final ObjectNode objectNode = mapper.createObjectNode();

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage();
airbyteRecordMessage.setEmittedAt(1602637589000L);
mockedFormatter.addAirbyteColumns(objectNode, airbyteRecordMessage);

assertEquals("2020-10-14 01:06:29.000000+00:00",
objectNode.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).textValue());
}

private static Stream<Arguments> actualAndExpectedSchemasProvider() {
return Stream.of(
arguments(getSchema(), getExpectedSchema()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter;
import io.airbyte.protocol.models.AirbyteRecordMessage;

public class TestBigQueryDenormalizedRecordFormatter extends
DefaultBigQueryDenormalizedRecordFormatter {

public TestBigQueryDenormalizedRecordFormatter(
JsonNode jsonSchema,
StandardNameTransformer namingResolver) {
super(jsonSchema, namingResolver);
}

@Override
public void addAirbyteColumns(ObjectNode data,
AirbyteRecordMessage recordMessage) {
// this method just exposes a protected method for testing making it public
super.addAirbyteColumns(data, recordMessage);
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ This uploads data directly from your source to BigQuery. While this is faster to

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------| :--- |
| 0.3.1 | 2022-04-15 | [11978](https://github.com/airbytehq/airbyte/pull/11978) | Fixed emittedAt timestamp. |
| 0.3.0 | 2022-04-06 | [11776](https://github.com/airbytehq/airbyte/pull/11776) | Use serialized buffering strategy to reduce memory consumption. |
| 0.2.15 | 2022-04-05 | [11166](https://github.com/airbytehq/airbyte/pull/11166) | Fixed handling of anyOf and allOf fields |
| 0.2.14 | 2022-04-02 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec |
Expand Down

0 comments on commit 281e0ee

Please sign in to comment.