diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index d3fceb40c291..1e1703e36484 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -85,7 +85,7 @@ protected void startTracked() throws InterruptedException { if (use1s1t) { // TODO extract common logic with GCS record consumer + extract into a higher level class // For each stream, make sure that its corresponding final table exists. - for (StreamConfig stream : catalog.streams()) { + for (final StreamConfig stream : catalog.streams()) { final Optional existingTable = destinationHandler.findExistingTable(stream.id()); if (existingTable.isEmpty()) { destinationHandler.execute(sqlGenerator.createTable(stream, "")); @@ -111,9 +111,9 @@ protected void startTracked() throws InterruptedException { } // For streams in overwrite mode, truncate the raw table and create a tmp table. - // non-1s1t syncs actually overwrite the raw table at the end of of the sync, wo we only do this in + // non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in // 1s1t mode. - for (StreamConfig stream : catalog.streams()) { + for (final StreamConfig stream : catalog.streams()) { LOGGER.info("Stream {} has sync mode {}", stream.id(), stream.destinationSyncMode()); final String suffix = overwriteStreamsWithTmpTable.get(stream.id()); if (stream.destinationSyncMode() == DestinationSyncMode.OVERWRITE && suffix != null && !suffix.isEmpty()) { @@ -126,6 +126,10 @@ protected void startTracked() throws InterruptedException { destinationHandler.execute(sqlGenerator.createTable(stream, suffix)); } } + + uploaderMap.forEach((streamId, uploader) -> { + uploader.createRawTable(); + }); } } @@ -203,7 +207,7 @@ public void close(final boolean hasFailed) { private void doTypingAndDeduping(final StreamConfig stream) throws InterruptedException { if (use1s1t) { - String suffix; + final String suffix; suffix = overwriteStreamsWithTmpTable.getOrDefault(stream.id(), ""); final String sql = sqlGenerator.updateTable(suffix, stream); destinationHandler.execute(sql); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java index c5709b46201b..390e2f7fdfd8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java @@ -107,13 +107,6 @@ protected void uploadData(final Consumer outputRecordCollector, LOGGER.info("Uploading data from the tmp table {} to the source table {}.", tmpTable.getTable(), table.getTable()); uploadDataToTableFromTmpTable(); LOGGER.info("Data is successfully loaded to the source table {}!", table.getTable()); - } else { - // Otherwise, we just need to ensure that this table exists. - // TODO alter an existing raw table? - final Table rawTable = bigQuery.getTable(table); - if (rawTable == null) { - bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(recordFormatter.getBigQuerySchema())).build()); - } } outputRecordCollector.accept(lastStateMessage); @@ -126,6 +119,18 @@ protected void uploadData(final Consumer outputRecordCollector, } } + public void createRawTable() { + // Ensure that this table exists. + // TODO alter an existing raw table? + final Table rawTable = bigQuery.getTable(table); + if (rawTable == null) { + LOGGER.info("Creating raw table {}.", table); + bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(recordFormatter.getBigQuerySchema())).build()); + } else { + LOGGER.info("Found raw table {}.", rawTable.getTableId()); + } + } + protected void dropTmpTable() { try { // clean up tmp tables;