diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 9aa8a5992440..4ddbd1763acd 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -27,7 +27,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 1.2.1 + dockerImageTag: 1.2.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: @@ -40,7 +40,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 1.2.2 + dockerImageTag: 1.2.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 43186bdfd42b..f321e4b92a86 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -285,7 +285,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-bigquery:1.2.1" +- dockerImage: "airbyte/destination-bigquery:1.2.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -495,7 +495,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.2" +- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 42c63f9de784..a1dd7c01d333 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.2.2 +LABEL io.airbyte.version=1.2.3 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index 5caf9d1c71f9..7f0b45eb9e0c 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -87,8 +87,8 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream, UploaderConfig uploaderConfig, Map> uploaderMap) throws IOException { - Table existingTable = - uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName()); + String datasetId = BigQueryUtils.sanitizeDatasetId(uploaderConfig.getConfigStream().getStream().getNamespace()); + Table existingTable = uploaderConfig.getBigQuery().getTable(datasetId, uploaderConfig.getTargetTableName()); BigQueryRecordFormatter formatter = uploaderConfig.getFormatter(); if (existingTable != null) { diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index eb6067b29281..3b5d34f7465e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.2.1 +LABEL io.airbyte.version=1.2.3 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index fc96b7463b82..803eddfaf3db 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -55,6 +55,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.apache.avro.Schema; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -209,7 +210,9 @@ protected Map> getUp final Map> uploaderMap = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); - stream.setNamespace(BigQueryUtils.getDatasetId(config)); + if (StringUtils.isEmpty(stream.getNamespace())) { + stream.setNamespace(BigQueryUtils.getDatasetId(config)); + } final String streamName = stream.getName(); final UploaderConfig uploaderConfig = UploaderConfig .builder() diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index 73988c853026..92fd666eb645 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,9 @@ public void acceptTracked(final AirbyteMessage message) { lastStateMessage = message; outputRecordCollector.accept(message); } else if (message.getType() == Type.RECORD) { - message.getRecord().setNamespace(datasetId); + if (StringUtils.isEmpty(message.getRecord().getNamespace())) { + message.getRecord().setNamespace(datasetId); + } processRecord(message); } else { LOGGER.warn("Unexpected message: {}", message.getType()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index e2cf6c858da5..2d3d1bb92f41 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -263,7 +263,11 @@ private static String getFormattedBigQueryDateTime(final String dateTimeValue) { public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) { final String srcNamespace = stream.getStream().getNamespace(); final String schemaName = srcNamespace == null ? getDatasetId(config) : srcNamespace; - return NAME_TRANSFORMER.getNamespace(schemaName); + return sanitizeDatasetId(schemaName); + } + + public static String sanitizeDatasetId(String datasetId) { + return NAME_TRANSFORMER.getNamespace(datasetId); } public static JobInfo.WriteDisposition getWriteDisposition(final DestinationSyncMode syncMode) { diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 2df856e21fdd..649f2a4f97bb 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -134,8 +134,9 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| -| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | -| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested | +| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace | +| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage | +| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested | | 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | | 1.1.15 | 2022-08-22 | [15787](https://github.com/airbytehq/airbyte/pull/15787) | Throw exception if job failed | | 1.1.14 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials | @@ -182,9 +183,10 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| -| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | -| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Wrapping string objects with TextNode | -| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested | +| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace | +| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage | +| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | (bugged, do not use) Wrapping string objects with TextNode | +| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested | | 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | | 1.1.15 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials | | 1.1.14 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |