diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 86bf75b9cc55..c41aea7e13e8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -13,6 +13,11 @@ dependencies { implementation 'com.google.cloud:google-cloud-bigquery:1.122.2' implementation 'org.apache.commons:commons-lang3:3.11' + // csv + implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978' + implementation 'org.apache.commons:commons-csv:1.4' + implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' + implementation project(':airbyte-config:models') implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:models') diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 4c3d1087e629..86061c7ca85f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -60,9 +60,7 @@ public class BigQueryDestination extends BaseConnector implements Destination { private static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of( Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), - // GCS works with only date\datetime formats, so need to have it a string for a while - // https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types - Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING), + Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); private final BigQuerySQLNameTransformer namingResolver; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index dc293e176c27..d5ec5996b030 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.commons.csv.CSVPrinter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,7 +120,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { } else { // GCS uploading way, this data will be moved to bigquery in close method final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); - gcsCsvWriter.write(UUID.randomUUID(), recordMessage); + writeRecordToCsv(gcsCsvWriter, recordMessage); } } else { LOGGER.warn("Unexpected message: " + message.getType()); @@ -138,6 +139,24 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt)); } + protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) { + // Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then + // use BQ helpers to string-format correctly. + final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); + final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); + final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData()); + try { + gcsCsvWriter.csvPrinter.printRecord( + UUID.randomUUID().toString(), + formattedEmittedAt, + Jsons.serialize(formattedData) + ); + } catch(IOException e) { + e.printStackTrace(); + LOGGER.warn("An error occurred writing CSV file."); + } + } + @Override public void close(final boolean hasFailed) { LOGGER.info("Started closing all connections"); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 1f065b61785e..79b4bbe6fc2a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -18,8 +18,10 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TimePartitioning; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; import java.util.Set; import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -69,7 +71,7 @@ static void createSchemaAndTableIfNeeded(final BigQuery bigquery, createSchemaTable(bigquery, schemaName, datasetLocation); existingSchemas.add(schemaName); } - BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema); + BigQueryUtils.createPartitionedTable(bigquery, schemaName, tmpTableName, schema); } static void createSchemaTable(final BigQuery bigquery, final String datasetId, final String datasetLocation) { @@ -95,6 +97,32 @@ static void createTable(final BigQuery bigquery, final String datasetName, final } } + // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java + static void createPartitionedTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) { + try { + + TableId tableId = TableId.of(datasetName, tableName); + + TimePartitioning partitioning = + TimePartitioning.newBuilder(TimePartitioning.Type.DAY) + .setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) // name of column to use for partitioning + .setExpirationMs(7776000000L) // 90 days + .build(); + + StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setSchema(schema) + .setTimePartitioning(partitioning) + .build(); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + bigquery.create(tableInfo); + LOGGER.info("Partitioned Table: {} created successfully", tableId); + } catch (BigQueryException e) { + LOGGER.info("Partitioned table was not created. \n" + e.toString()); + } + } + public static JsonNode getGcsJsonNodeConfig(final JsonNode config) { final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); final JsonNode gcsJsonNode = Jsons.jsonNode(ImmutableMap.builder() diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java index 38f22a5bd10c..bb0abe8a5e82 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java @@ -34,7 +34,7 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer { private final CsvSheetGenerator csvSheetGenerator; private final StreamTransferManager uploadManager; private final MultiPartOutputStream outputStream; - private final CSVPrinter csvPrinter; + public final CSVPrinter csvPrinter; private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type) public GcsCsvWriter(final GcsDestinationConfig config,