Skip to content

Commit

Permalink
Destination Bigquery / Bigquery-denormalized: Only override dataset I…
Browse files Browse the repository at this point in the history
…D if stream namespace is null/empty (airbytehq#17054)

* stop overriding namespace?

* set namespace if needed

* also check for empty namespace

* version bump + changelog

* auto-bump connector version [ci skip]

* sanitize dataset id

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and robbinhan committed Sep 29, 2022
1 parent 61873e2 commit 7f281af
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.2.1
dockerImageTag: 1.2.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand All @@ -40,7 +40,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.2.2
dockerImageTag: 1.2.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.2.1"
- dockerImage: "airbyte/destination-bigquery:1.2.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -495,7 +495,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.2"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.2
LABEL io.airbyte.version=1.2.3
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream,
UploaderConfig uploaderConfig,
Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
throws IOException {
Table existingTable =
uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName());
String datasetId = BigQueryUtils.sanitizeDatasetId(uploaderConfig.getConfigStream().getStream().getNamespace());
Table existingTable = uploaderConfig.getBigQuery().getTable(datasetId, uploaderConfig.getTargetTableName());
BigQueryRecordFormatter formatter = uploaderConfig.getFormatter();

if (existingTable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.1
LABEL io.airbyte.version=1.2.3
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -209,7 +210,9 @@ protected Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> getUp
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap = new HashMap<>();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final AirbyteStream stream = configStream.getStream();
stream.setNamespace(BigQueryUtils.getDatasetId(config));
if (StringUtils.isEmpty(stream.getNamespace())) {
stream.setNamespace(BigQueryUtils.getDatasetId(config));
}
final String streamName = stream.getName();
final UploaderConfig uploaderConfig = UploaderConfig
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -46,7 +47,9 @@ public void acceptTracked(final AirbyteMessage message) {
lastStateMessage = message;
outputRecordCollector.accept(message);
} else if (message.getType() == Type.RECORD) {
message.getRecord().setNamespace(datasetId);
if (StringUtils.isEmpty(message.getRecord().getNamespace())) {
message.getRecord().setNamespace(datasetId);
}
processRecord(message);
} else {
LOGGER.warn("Unexpected message: {}", message.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ private static String getFormattedBigQueryDateTime(final String dateTimeValue) {
public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) {
final String srcNamespace = stream.getStream().getNamespace();
final String schemaName = srcNamespace == null ? getDatasetId(config) : srcNamespace;
return NAME_TRANSFORMER.getNamespace(schemaName);
return sanitizeDatasetId(schemaName);
}

public static String sanitizeDatasetId(String datasetId) {
return NAME_TRANSFORMER.getNamespace(datasetId);
}

public static JobInfo.WriteDisposition getWriteDisposition(final DestinationSyncMode syncMode) {
Expand Down
12 changes: 7 additions & 5 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested |
| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace |
| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage |
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested |
| 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 1.1.15 | 2022-08-22 | [15787](https://github.com/airbytehq/airbyte/pull/15787) | Throw exception if job failed |
| 1.1.14 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials |
Expand Down Expand Up @@ -182,9 +183,10 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Wrapping string objects with TextNode |
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested |
| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace |
| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage |
| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | (bugged, do not use) Wrapping string objects with TextNode |
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested |
| 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 1.1.15 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials |
| 1.1.14 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
Expand Down

0 comments on commit 7f281af

Please sign in to comment.