Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 BigQuery destinations with partitionned/clustered keys #7240

Merged
merged 13 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.4.1
dockerImageTag: 0.5.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class {{pascalCase name}}JdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTe
return {{pascalCase name}}Source.DRIVER_CLASS;
}

@Override
public AbstractJdbcSource getJdbcSource() {
// TODO
return null;
}

@AfterAll
static void cleanUp() {
// TODO close the container. Ex: "container.close();"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) {
.collect(Collectors.toList()));

// "Array of Array of" (nested arrays) are not permitted by BigQuery ("Array of Record of Array of"
// is)
// Turn all "Array of" into "Array of Record of" instead
// is). Turn all "Array of" into "Array of Record of" instead
return Jsons.jsonNode(ImmutableMap.of(BigQueryDenormalizedDestination.NESTED_ARRAY_FIELD, items));
} else {
return root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.1
LABEL io.airbyte.version=0.5.0
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'org.apache.commons:commons-lang3:3.11'

// csv
implementation 'org.apache.commons:commons-csv:1.4'

implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public class BigQueryDestination extends BaseConnector implements Destination {

private static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of(
Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
// GCS works with only date\datetime formats, so need to have it a string for a while
// https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));

private final BigQuerySQLNameTransformer namingResolver;
Expand Down Expand Up @@ -318,10 +316,10 @@ private boolean isKeepFilesInGcs(final JsonNode config) {
if (loadingMethod != null && loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES) != null
&& BigQueryConsts.KEEP_GCS_FILES_VAL
.equals(loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES).asText())) {
LOGGER.info("All tmp files GCS will be kept in bucket when migration is finished");
LOGGER.info("All tmp files GCS will be kept in bucket when replication is finished");
return true;
} else {
LOGGER.info("All tmp files will be removed from GCS when migration is finished");
LOGGER.info("All tmp files will be removed from GCS when replication is finished");
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.CopyJobConfiguration;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
Expand All @@ -27,6 +29,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
Expand Down Expand Up @@ -119,7 +122,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException {
} else {
// GCS uploading way, this data will be moved to bigquery in close method
final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter();
gcsCsvWriter.write(UUID.randomUUID(), recordMessage);
writeRecordToCsv(gcsCsvWriter, recordMessage);
}
} else {
LOGGER.warn("Unexpected message: " + message.getType());
Expand All @@ -138,6 +141,23 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt));
}

protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) {
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this other PR, it's converted into seconds instead of microseconds? These changes should be somehow equivalent, right?

WDYT @etsybaev @andresbravog?
https://github.com/airbytehq/airbyte/pull/5981/files#r734312334

final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();
final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData());
try {
gcsCsvWriter.getCsvPrinter().printRecord(
UUID.randomUUID().toString(),
formattedEmittedAt,
Jsons.serialize(formattedData));
} catch (IOException e) {
e.printStackTrace();
LOGGER.warn("An error occurred writing CSV file.");
}
}

@Override
public void close(final boolean hasFailed) {
LOGGER.info("Started closing all connections");
Expand Down Expand Up @@ -181,7 +201,7 @@ private void closeGcsStreamsAndCopyDataToBigQuery(final boolean hasFailed) {
try {
loadCsvFromGcsTruncate(pair);
} catch (final Exception e) {
LOGGER.error("Failed to load data from GCS CSV file to BibQuery tmp table with reason: " + e.getMessage());
LOGGER.error("Failed to load data from GCS CSV file to BigQuery tmp table with reason: " + e.getMessage());
throw new RuntimeException(e);
}
});
Expand All @@ -198,7 +218,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
LOGGER.info(String.format("Started coping data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s",
LOGGER.info(String.format("Started copying data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s",
csvFile, tmpTable, schema));

final CsvOptions csvOptions = CsvOptions.newBuilder().setEncoding(UTF8).setSkipLeadingRows(1).build();
Expand All @@ -215,7 +235,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi
// Load the table
final Job loadJob = bigquery.create(JobInfo.of(configuration));

LOGGER.info("Crated a new job GCS csv file to tmp BigQuery table: " + loadJob);
LOGGER.info("Created a new job GCS csv file to tmp BigQuery table: " + loadJob);
LOGGER.info("Waiting for job to complete...");

// Load data from a GCS parquet file into the table
Expand Down Expand Up @@ -272,15 +292,20 @@ private void closeNormalBigqueryStreams(final boolean hasFailed) {
}));

if (!hasFailed) {
LOGGER.info("Migration finished with no explicit errors. Copying data from tmp tables to permanent");
LOGGER.info("Replication finished with no explicit errors. Copying data from tmp tables to permanent");
writeConfigs.values()
.forEach(
bigQueryWriteConfig -> copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(),
bigQueryWriteConfig.getSyncMode()));
bigQueryWriteConfig -> {
if (bigQueryWriteConfig.getSyncMode().equals(WriteDisposition.WRITE_APPEND)) {
partitionIfUnpartitioned(bigQueryWriteConfig, bigquery, bigQueryWriteConfig.getTable());
}
copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(),
bigQueryWriteConfig.getSyncMode());
});
// BQ is still all or nothing if a failure happens in the destination.
outputRecordCollector.accept(lastStateMessage);
} else {
LOGGER.warn("Had errors while migrations");
LOGGER.warn("Had errors while replicating");
}
} finally {
// clean up tmp tables;
Expand Down Expand Up @@ -324,7 +349,6 @@ private static void copyTable(
final TableId sourceTableId,
final TableId destinationTableId,
final WriteDisposition syncMode) {

final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId)
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(syncMode)
Expand All @@ -336,7 +360,67 @@ private static void copyTable(
LOGGER.error("Failed on copy tables with error:" + job.getStatus());
throw new RuntimeException("BigQuery was unable to copy table due to an error: \n" + job.getStatus().getError());
}
LOGGER.info("successfully copied tmp table: {} to final table: {}", sourceTableId, destinationTableId);
LOGGER.info("successfully copied table: {} to table: {}", sourceTableId, destinationTableId);
}

private void partitionIfUnpartitioned(final BigQueryWriteConfig bigQueryWriteConfig,
final BigQuery bigquery,
final TableId destinationTableId) {
try {
final QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(
String.format("SELECT max(is_partitioning_column) as is_partitioned FROM `%s.%s.INFORMATION_SCHEMA.COLUMNS` WHERE TABLE_NAME = '%s';",
bigquery.getOptions().getProjectId(),
destinationTableId.getDataset(),
destinationTableId.getTable()))
.setUseLegacySql(false)
.build();
final ImmutablePair<Job, String> result = BigQueryUtils.executeQuery(bigquery, queryConfig);
result.getLeft().getQueryResults().getValues().forEach(row -> {
if (!row.get("is_partitioned").isNull() && row.get("is_partitioned").getStringValue().equals("NO")) {
LOGGER.info("Partitioning existing destination table {}", destinationTableId);
final String tmpPartitionTable = Strings.addRandomSuffix("_airbyte_partitioned_table", "_", 5);
final TableId tmpPartitionTableId = TableId.of(destinationTableId.getDataset(), tmpPartitionTable);
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
// make sure tmpPartitionTable does not already exist
bigquery.delete(tmpPartitionTableId);
// Use BigQuery SQL to copy because java api copy jobs does not support creating a table from a
// select query, see:
// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_a_partitioned_table_from_a_query_result
final QueryJobConfiguration partitionQuery = QueryJobConfiguration
.newBuilder(
getCreatePartitionedTableFromSelectQuery(bigQueryWriteConfig.getSchema(), bigquery.getOptions().getProjectId(), destinationTableId,
tmpPartitionTable))
.setUseLegacySql(false)
.build();
BigQueryUtils.executeQuery(bigquery, partitionQuery);
// Copying data from a partitioned tmp table into an existing non-partitioned table does not make it
// partitioned... thus, we force re-create from scratch by completely deleting and creating new
// table.
bigquery.delete(destinationTableId);
copyTable(bigquery, tmpPartitionTableId, destinationTableId, WriteDisposition.WRITE_EMPTY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we be copying data from the destinationTableId into tmpPartitionedTableId instead of the other way around?

Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first, it does a create table ... as select ... from which is a copy from destinationTableId into tmpPartitionedTableId (that does converts into Partitioned from Unpartitioned). But BigQuery copy jobs don't transfer table partition modes when copying (thus the SQL approach instead):

Finally, we simply need to "rename" the tmp back to destinationTableId (and make a last simple delete/copy for that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we:

  1. Create an empty partitioned tmpPartitionedTable as select * from destinationTable
  2. delete destination table
  3. copy tmpPartitionedTable into destination table
  4. copy new data from the tmp table (this tmp contains new data from the sync) into the destinationTable

bigquery.delete(tmpPartitionTableId);
}
});
} catch (final InterruptedException e) {
LOGGER.warn("Had errors while partitioning: ", e);
}
}

protected String getCreatePartitionedTableFromSelectQuery(final Schema schema,
final String projectId,
final TableId destinationTableId,
final String tmpPartitionTable) {
return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable)
+ schema.getFields().stream()
.map(field -> String.format("%s %s", field.getName(), field.getType()))
.collect(Collectors.joining(", "))
+ ") partition by date("
+ JavaBaseConstants.COLUMN_NAME_EMITTED_AT
+ ") as select "
+ schema.getFields().stream()
.map(Field::getName)
.collect(Collectors.joining(", "))
+ String.format(" from `%s.%s.%s`", projectId, destinationTableId.getDataset(), destinationTableId.getTable());
}

private void printHeapMemoryConsumption() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Job;
Expand All @@ -18,8 +19,11 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -69,7 +73,7 @@ static void createSchemaAndTableIfNeeded(final BigQuery bigquery,
createSchemaTable(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema);
BigQueryUtils.createPartitionedTable(bigquery, schemaName, tmpTableName, schema);
}

static void createSchemaTable(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
Expand All @@ -80,18 +84,32 @@ static void createSchemaTable(final BigQuery bigquery, final String datasetId, f
}
}

// https://cloud.google.com/bigquery/docs/tables#create-table
static void createTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) {
// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java
static void createPartitionedTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) {
try {

final TableId tableId = TableId.of(datasetName, tableName);
final TableDefinition tableDefinition = StandardTableDefinition.of(schema);

final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.build();

final Clustering clustering = Clustering.newBuilder()
.setFields(ImmutableList.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.build();

final StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setSchema(schema)
.setTimePartitioning(partitioning)
.setClustering(clustering)
.build();
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

bigquery.create(tableInfo);
LOGGER.info("Table: {} created successfully", tableId);
} catch (final BigQueryException e) {
LOGGER.info("Table was not created. \n", e);
LOGGER.info("Partitioned Table: {} created successfully", tableId);
} catch (BigQueryException e) {
LOGGER.info("Partitioned table was not created. \n" + e);
}
}

Expand Down
Loading