diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 1b306a02917d..0cf8493a7c20 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -22,12 +22,12 @@ - destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 name: BigQuery dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.4.1 + dockerImageTag: 0.5.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 name: BigQuery (denormalized typed struct) dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a name: Google Cloud Storage (GCS) diff --git a/airbyte-integrations/connector-templates/source-java-jdbc/src/test/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}JdbcSourceAcceptanceTest.java.hbs b/airbyte-integrations/connector-templates/source-java-jdbc/src/test/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}JdbcSourceAcceptanceTest.java.hbs index 9e0e99ac6802..39ddacc2b40a 100644 --- a/airbyte-integrations/connector-templates/source-java-jdbc/src/test/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}JdbcSourceAcceptanceTest.java.hbs +++ b/airbyte-integrations/connector-templates/source-java-jdbc/src/test/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}JdbcSourceAcceptanceTest.java.hbs @@ -60,6 +60,12 @@ class {{pascalCase name}}JdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTe return {{pascalCase name}}Source.DRIVER_CLASS; } + @Override + public AbstractJdbcSource getJdbcSource() { + // TODO + return null; + } + @AfterAll static void cleanUp() { // TODO close the container. Ex: "container.close();" diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index cc0756692987..2ad0b213627c 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java index 29769ea0559c..d52da1ffe77b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java @@ -90,8 +90,7 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) { .collect(Collectors.toList())); // "Array of Array of" (nested arrays) are not permitted by BigQuery ("Array of Record of Array of" - // is) - // Turn all "Array of" into "Array of Record of" instead + // is). Turn all "Array of" into "Array of Record of" instead return Jsons.jsonNode(ImmutableMap.of(BigQueryDenormalizedDestination.NESTED_ARRAY_FIELD, items)); } else { return root; diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 456cef651eb0..d507eed69ceb 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.4.1 +LABEL io.airbyte.version=0.5.0 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 86bf75b9cc55..c30f258a5b59 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -13,6 +13,9 @@ dependencies { implementation 'com.google.cloud:google-cloud-bigquery:1.122.2' implementation 'org.apache.commons:commons-lang3:3.11' + // csv + implementation 'org.apache.commons:commons-csv:1.4' + implementation project(':airbyte-config:models') implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:models') diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 4c3d1087e629..99a3568ec5f2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -60,9 +60,7 @@ public class BigQueryDestination extends BaseConnector implements Destination { private static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of( Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), - // GCS works with only date\datetime formats, so need to have it a string for a while - // https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types - Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING), + Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); private final BigQuerySQLNameTransformer namingResolver; @@ -318,10 +316,10 @@ private boolean isKeepFilesInGcs(final JsonNode config) { if (loadingMethod != null && loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES) != null && BigQueryConsts.KEEP_GCS_FILES_VAL .equals(loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES).asText())) { - LOGGER.info("All tmp files GCS will be kept in bucket when migration is finished"); + LOGGER.info("All tmp files GCS will be kept in bucket when replication is finished"); return true; } else { - LOGGER.info("All tmp files will be removed from GCS when migration is finished"); + LOGGER.info("All tmp files will be removed from GCS when replication is finished"); return false; } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index dc293e176c27..ce725ab48484 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -14,11 +14,13 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.CopyJobConfiguration; import com.google.cloud.bigquery.CsvOptions; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.JobInfo.WriteDisposition; import com.google.cloud.bigquery.LoadJobConfiguration; +import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableDataWriteChannel; @@ -27,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; @@ -119,7 +122,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { } else { // GCS uploading way, this data will be moved to bigquery in close method final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); - gcsCsvWriter.write(UUID.randomUUID(), recordMessage); + writeRecordToCsv(gcsCsvWriter, recordMessage); } } else { LOGGER.warn("Unexpected message: " + message.getType()); @@ -138,6 +141,23 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt)); } + protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) { + // Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then + // use BQ helpers to string-format correctly. + final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); + final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); + final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData()); + try { + gcsCsvWriter.getCsvPrinter().printRecord( + UUID.randomUUID().toString(), + formattedEmittedAt, + Jsons.serialize(formattedData)); + } catch (IOException e) { + e.printStackTrace(); + LOGGER.warn("An error occurred writing CSV file."); + } + } + @Override public void close(final boolean hasFailed) { LOGGER.info("Started closing all connections"); @@ -181,7 +201,7 @@ private void closeGcsStreamsAndCopyDataToBigQuery(final boolean hasFailed) { try { loadCsvFromGcsTruncate(pair); } catch (final Exception e) { - LOGGER.error("Failed to load data from GCS CSV file to BibQuery tmp table with reason: " + e.getMessage()); + LOGGER.error("Failed to load data from GCS CSV file to BigQuery tmp table with reason: " + e.getMessage()); throw new RuntimeException(e); } }); @@ -198,7 +218,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi // Initialize client that will be used to send requests. This client only needs to be created // once, and can be reused for multiple requests. - LOGGER.info(String.format("Started coping data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s", + LOGGER.info(String.format("Started copying data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s", csvFile, tmpTable, schema)); final CsvOptions csvOptions = CsvOptions.newBuilder().setEncoding(UTF8).setSkipLeadingRows(1).build(); @@ -215,7 +235,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi // Load the table final Job loadJob = bigquery.create(JobInfo.of(configuration)); - LOGGER.info("Crated a new job GCS csv file to tmp BigQuery table: " + loadJob); + LOGGER.info("Created a new job GCS csv file to tmp BigQuery table: " + loadJob); LOGGER.info("Waiting for job to complete..."); // Load data from a GCS parquet file into the table @@ -272,15 +292,20 @@ private void closeNormalBigqueryStreams(final boolean hasFailed) { })); if (!hasFailed) { - LOGGER.info("Migration finished with no explicit errors. Copying data from tmp tables to permanent"); + LOGGER.info("Replication finished with no explicit errors. Copying data from tmp tables to permanent"); writeConfigs.values() .forEach( - bigQueryWriteConfig -> copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(), - bigQueryWriteConfig.getSyncMode())); + bigQueryWriteConfig -> { + if (bigQueryWriteConfig.getSyncMode().equals(WriteDisposition.WRITE_APPEND)) { + partitionIfUnpartitioned(bigQueryWriteConfig, bigquery, bigQueryWriteConfig.getTable()); + } + copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(), + bigQueryWriteConfig.getSyncMode()); + }); // BQ is still all or nothing if a failure happens in the destination. outputRecordCollector.accept(lastStateMessage); } else { - LOGGER.warn("Had errors while migrations"); + LOGGER.warn("Had errors while replicating"); } } finally { // clean up tmp tables; @@ -324,7 +349,6 @@ private static void copyTable( final TableId sourceTableId, final TableId destinationTableId, final WriteDisposition syncMode) { - final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId) .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .setWriteDisposition(syncMode) @@ -336,7 +360,67 @@ private static void copyTable( LOGGER.error("Failed on copy tables with error:" + job.getStatus()); throw new RuntimeException("BigQuery was unable to copy table due to an error: \n" + job.getStatus().getError()); } - LOGGER.info("successfully copied tmp table: {} to final table: {}", sourceTableId, destinationTableId); + LOGGER.info("successfully copied table: {} to table: {}", sourceTableId, destinationTableId); + } + + private void partitionIfUnpartitioned(final BigQueryWriteConfig bigQueryWriteConfig, + final BigQuery bigquery, + final TableId destinationTableId) { + try { + final QueryJobConfiguration queryConfig = QueryJobConfiguration + .newBuilder( + String.format("SELECT max(is_partitioning_column) as is_partitioned FROM `%s.%s.INFORMATION_SCHEMA.COLUMNS` WHERE TABLE_NAME = '%s';", + bigquery.getOptions().getProjectId(), + destinationTableId.getDataset(), + destinationTableId.getTable())) + .setUseLegacySql(false) + .build(); + final ImmutablePair result = BigQueryUtils.executeQuery(bigquery, queryConfig); + result.getLeft().getQueryResults().getValues().forEach(row -> { + if (!row.get("is_partitioned").isNull() && row.get("is_partitioned").getStringValue().equals("NO")) { + LOGGER.info("Partitioning existing destination table {}", destinationTableId); + final String tmpPartitionTable = Strings.addRandomSuffix("_airbyte_partitioned_table", "_", 5); + final TableId tmpPartitionTableId = TableId.of(destinationTableId.getDataset(), tmpPartitionTable); + // make sure tmpPartitionTable does not already exist + bigquery.delete(tmpPartitionTableId); + // Use BigQuery SQL to copy because java api copy jobs does not support creating a table from a + // select query, see: + // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_a_partitioned_table_from_a_query_result + final QueryJobConfiguration partitionQuery = QueryJobConfiguration + .newBuilder( + getCreatePartitionedTableFromSelectQuery(bigQueryWriteConfig.getSchema(), bigquery.getOptions().getProjectId(), destinationTableId, + tmpPartitionTable)) + .setUseLegacySql(false) + .build(); + BigQueryUtils.executeQuery(bigquery, partitionQuery); + // Copying data from a partitioned tmp table into an existing non-partitioned table does not make it + // partitioned... thus, we force re-create from scratch by completely deleting and creating new + // table. + bigquery.delete(destinationTableId); + copyTable(bigquery, tmpPartitionTableId, destinationTableId, WriteDisposition.WRITE_EMPTY); + bigquery.delete(tmpPartitionTableId); + } + }); + } catch (final InterruptedException e) { + LOGGER.warn("Had errors while partitioning: ", e); + } + } + + protected String getCreatePartitionedTableFromSelectQuery(final Schema schema, + final String projectId, + final TableId destinationTableId, + final String tmpPartitionTable) { + return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable) + + schema.getFields().stream() + .map(field -> String.format("%s %s", field.getName(), field.getType())) + .collect(Collectors.joining(", ")) + + ") partition by date(" + + JavaBaseConstants.COLUMN_NAME_EMITTED_AT + + ") as select " + + schema.getFields().stream() + .map(Field::getName) + .collect(Collectors.joining(", ")) + + String.format(" from `%s.%s.%s`", projectId, destinationTableId.getDataset(), destinationTableId.getTable()); } private void printHeapMemoryConsumption() { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 1f065b61785e..d5fc8a397cb0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Clustering; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Job; @@ -18,8 +19,11 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; import java.util.Set; import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -69,7 +73,7 @@ static void createSchemaAndTableIfNeeded(final BigQuery bigquery, createSchemaTable(bigquery, schemaName, datasetLocation); existingSchemas.add(schemaName); } - BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema); + BigQueryUtils.createPartitionedTable(bigquery, schemaName, tmpTableName, schema); } static void createSchemaTable(final BigQuery bigquery, final String datasetId, final String datasetLocation) { @@ -80,18 +84,32 @@ static void createSchemaTable(final BigQuery bigquery, final String datasetId, f } } - // https://cloud.google.com/bigquery/docs/tables#create-table - static void createTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) { + // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java + static void createPartitionedTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) { try { final TableId tableId = TableId.of(datasetName, tableName); - final TableDefinition tableDefinition = StandardTableDefinition.of(schema); + + final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY) + .setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) + .build(); + + final Clustering clustering = Clustering.newBuilder() + .setFields(ImmutableList.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .build(); + + final StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setSchema(schema) + .setTimePartitioning(partitioning) + .setClustering(clustering) + .build(); final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); bigquery.create(tableInfo); - LOGGER.info("Table: {} created successfully", tableId); - } catch (final BigQueryException e) { - LOGGER.info("Table was not created. \n", e); + LOGGER.info("Partitioned Table: {} created successfully", tableId); + } catch (BigQueryException e) { + LOGGER.info("Partitioned table was not created. \n" + e); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 5c2475c81ee8..d364c479715b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -6,6 +6,7 @@ import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doThrow; @@ -18,7 +19,12 @@ import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; @@ -39,6 +45,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitive; import java.io.ByteArrayInputStream; @@ -50,6 +57,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -130,7 +138,8 @@ void setup(final TestInfo info) throws IOException { CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING), io.airbyte.protocol.models.Field - .of("id", JsonSchemaPrimitive.STRING)), + .of("id", JsonSchemaPrimitive.STRING)) + .withDestinationSyncMode(DestinationSyncMode.APPEND), CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaPrimitive.STRING)))); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); @@ -296,4 +305,68 @@ private List retrieveRecords(final String tableName) throws Exception .collect(Collectors.toList()); } + @Test + void testWritePartitionOverUnpartitioned() throws Exception { + final String raw_table_name = String.format("_airbyte_raw_%s", USERS_STREAM_NAME); + createUnpartitionedTable(bigquery, dataset, raw_table_name); + assertFalse(isTablePartitioned(bigquery, dataset, raw_table_name)); + final BigQueryDestination destination = new BigQueryDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS1); + consumer.accept(MESSAGE_TASKS1); + consumer.accept(MESSAGE_USERS2); + consumer.accept(MESSAGE_TASKS2); + consumer.accept(MESSAGE_STATE); + consumer.close(); + + final List usersActual = retrieveRecords(NAMING_RESOLVER.getRawTableName(USERS_STREAM_NAME)); + final List expectedUsersJson = Lists.newArrayList(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); + assertEquals(expectedUsersJson.size(), usersActual.size()); + assertTrue(expectedUsersJson.containsAll(usersActual) && usersActual.containsAll(expectedUsersJson)); + + final List tasksActual = retrieveRecords(NAMING_RESOLVER.getRawTableName(TASKS_STREAM_NAME)); + final List expectedTasksJson = Lists.newArrayList(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); + assertEquals(expectedTasksJson.size(), tasksActual.size()); + assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson)); + + assertTmpTablesNotPresent(catalog.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(Collectors.toList())); + assertTrue(isTablePartitioned(bigquery, dataset, raw_table_name)); + } + + private void createUnpartitionedTable(final BigQuery bigquery, final Dataset dataset, final String tableName) { + final TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName); + bigquery.delete(tableId); + final com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of( + com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), + com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP), + com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); + final StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setSchema(schema) + .build(); + final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + bigquery.create(tableInfo); + } + + private boolean isTablePartitioned(final BigQuery bigquery, final Dataset dataset, final String tableName) throws InterruptedException { + final QueryJobConfiguration queryConfig = QueryJobConfiguration + .newBuilder( + String.format("SELECT max(is_partitioning_column) as is_partitioned FROM `%s.%s.INFORMATION_SCHEMA.COLUMNS` WHERE TABLE_NAME = '%s';", + bigquery.getOptions().getProjectId(), + dataset.getDatasetId().getDataset(), + tableName)) + .setUseLegacySql(false) + .build(); + final ImmutablePair result = BigQueryUtils.executeQuery(bigquery, queryConfig); + for (final com.google.cloud.bigquery.FieldValueList row : result.getLeft().getQueryResults().getValues()) { + return !row.get("is_partitioned").isNull() && row.get("is_partitioned").getStringValue().equals("YES"); + } + return false; + } + } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java index 38f22a5bd10c..4551a0b07dc1 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java @@ -86,4 +86,8 @@ public String getGcsCsvFileLocation() { return gcsCsvFileLocation; } + public CSVPrinter getCsvPrinter() { + return csvPrinter; + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/build.gradle b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/build.gradle index 1bae2739b761..f6ab4c01d02a 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation project(':airbyte-integrations:connectors:source-relational-db') implementation project(':airbyte-integrations:connectors:source-mongodb-v2') + implementation 'org.mongodb:mongodb-driver-sync:4.3.0' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mongodb-strict-encrypt') diff --git a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java index e023ddf53c13..24cae6be20e6 100644 --- a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java @@ -60,6 +60,12 @@ public String getDriverClass() { return ScaffoldJavaJdbcSource.DRIVER_CLASS; } + @Override + public AbstractJdbcSource getJdbcSource() { + // TODO + return null; + } + @AfterAll static void cleanUp() { // TODO close the container. Ex: "container.close();" diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java index 98873ee7c82f..1f732de4acd8 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java @@ -90,8 +90,10 @@ protected void initTests() { TestDataHolder.builder() .sourceType("NUMBER") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null", "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999","9223372036854775807", "-9223372036854775808") - .addExpectedValues(null, "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999","9223372036854775807", "-9223372036854775808") + .addInsertValues("null", "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999", "9223372036854775807", + "-9223372036854775808") + .addExpectedValues(null, "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999", "9223372036854775807", + "-9223372036854775808") .build()); addDataTypeTestData( TestDataHolder.builder() @@ -104,8 +106,10 @@ protected void initTests() { TestDataHolder.builder() .sourceType("NUMERIC") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null", "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999", "9223372036854775807", "-9223372036854775808") - .addExpectedValues(null, "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999", "9223372036854775807", "-9223372036854775808") + .addInsertValues("null", "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999", "9223372036854775807", + "-9223372036854775808") + .addExpectedValues(null, "99999999999999999999999999999999999999", "-99999999999999999999999999999999999999", "9223372036854775807", + "-9223372036854775808") .build()); addDataTypeTestData( TestDataHolder.builder() diff --git a/airbyte-integrations/connectors/source-zuora/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-zuora/integration_tests/invalid_config.json index 7a79ec882637..5529e73ba380 100644 --- a/airbyte-integrations/connectors/source-zuora/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-zuora/integration_tests/invalid_config.json @@ -5,4 +5,4 @@ "data_query": "Live", "client_id": "some_client_id", "client_secret": "some_client_secret" -} \ No newline at end of file +} diff --git a/airbyte-integrations/connectors/source-zuora/source_zuora/spec.json b/airbyte-integrations/connectors/source-zuora/source_zuora/spec.json index bdef4e120676..dbf8498f0411 100644 --- a/airbyte-integrations/connectors/source-zuora/source_zuora/spec.json +++ b/airbyte-integrations/connectors/source-zuora/source_zuora/spec.json @@ -4,7 +4,13 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Zuora Connector Configuration", "type": "object", - "required": ["start_date", "tenant_endpoint", "data_query", "client_id", "client_secret"], + "required": [ + "start_date", + "tenant_endpoint", + "data_query", + "client_id", + "client_secret" + ], "properties": { "start_date": { "type": "string", @@ -40,10 +46,7 @@ "title": "Data Query Type", "type": "string", "description": "Choose between `Live`, or `Unlimited` - the optimized, replicated database at 12 hours freshness for high volume extraction Link", - "enum": [ - "Live", - "Unlimited" - ], + "enum": ["Live", "Unlimited"], "default": "Live" }, "client_id": { diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index ba6974883afe..3b691446003a 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -30,9 +30,12 @@ Check out common troubleshooting issues for the BigQuery destination connector o Each stream will be output into its own table in BigQuery. Each table will contain 3 columns: * `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in BigQuery is `String`. -* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in BigQuery is `String`. Due to a Google [limitations](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types) for data migration from GCs to BigQuery by its native job - the timestamp \(seconds from 1970' can't be used\). Only date format, so only String is accepted for us in this case. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in BigQuery is `Timestamp`. * `_airbyte_data`: a json blob representing with the event data. The column type in BigQuery is `String`. +The output tables from the BigQuery destination are partitioned and clustered by the Time-unit column `_airbyte_emitted_at` at a daily granularity. Partitions boundaries are based on UTC time. +This is useful to limit the number of partitions scanned when querying these partitioned tables, by using a predicate filter (a WHERE clause). Filters on the partitioning column will be used to prune the partitions and reduce the query cost. (The parameter "Require partition filter" is not enabled by Airbyte, but you may toggle this by updating the produced tables if you wish so) + ## Getting Started \(Airbyte Open-Source / Airbyte Cloud\) #### Requirements @@ -89,7 +92,11 @@ You should now have all the requirements needed to configure BigQuery as a desti * **Dataset Location** * **Dataset ID**: the name of the schema where the tables will be created. * **Service Account Key**: the contents of your Service Account Key JSON file + +Additional options can also be customized: + * **Google BigQuery client chunk size**: Google BigQuery client's chunk\(buffer\) size \(MIN=1, MAX = 15\) for each table. The default 15MiB value is used if not set explicitly. It's recommended to decrease value for big data sets migration for less HEAP memory consumption and avoiding crashes. For more details refer to [https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html](https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html) +* **Transformation Priority**: configure the priority of queries run for transformations. Refer to [https://cloud.google.com/bigquery/docs/running-queries](https://cloud.google.com/bigquery/docs/running-queries). By default, Airbyte runs interactive query jobs on BigQuery, which means that the query is executed as soon as possible and count towards daily concurrent quotas and limits. If set to use batch query on your behalf, BigQuery starts the query as soon as idle resources are available in the BigQuery shared resource pool. This usually occurs within a few minutes. If BigQuery hasn't started the query within 24 hours, BigQuery changes the job priority to interactive. Batch queries don't count towards your concurrent rate limit, which can make it easier to start many queries at once. Once you've configured BigQuery as a destination, delete the Service Account Key from your computer. @@ -148,7 +155,8 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.4.0 | 2021-10-04 | [\#6733](https://github.com/airbytehq/airbyte/issues/6733) | Support dataset starting with numbers | +| 0.5.0 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables | +| 0.4.1 | 2021-10-04 | [\#6733](https://github.com/airbytehq/airbyte/issues/6733) | Support dataset starting with numbers | | 0.4.0 | 2021-08-26 | [\#5296](https://github.com/airbytehq/airbyte/issues/5296) | Added GCS Staging uploading option | | 0.3.12 | 2021-08-03 | [\#3549](https://github.com/airbytehq/airbyte/issues/3549) | Add optional arg to make a possibility to change the BigQuery client's chunk\buffer size | | 0.3.11 | 2021-07-30 | [\#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | @@ -161,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables | | 0.1.6 | 2021-09-16 | [\#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key | | 0.1.5 | 2021-09-07 | [\#5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix | | 0.1.4 | 2021-09-04 | [\#5813](https://github.com/airbytehq/airbyte/pull/5813) | fix Stackoverflow error when receive a schema from source where "Array" type doesn't contain a required "items" element |