Skip to content

Commit

Permalink
[ airbytehq#5959 ][ airbytehq#2579 ] Add support of partitioned table…
Browse files Browse the repository at this point in the history
…s by _airbyte_emitted_at field
  • Loading branch information
andresbravog committed Oct 18, 2021
1 parent f194f35 commit 0dbf378
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'org.apache.commons:commons-lang3:3.11'

// csv
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'
implementation 'org.apache.commons:commons-csv:1.4'
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'

implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public class BigQueryDestination extends BaseConnector implements Destination {

private static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of(
Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
// GCS works with only date\datetime formats, so need to have it a string for a while
// https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));

private final BigQuerySQLNameTransformer namingResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException {
} else {
// GCS uploading way, this data will be moved to bigquery in close method
final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter();
gcsCsvWriter.write(UUID.randomUUID(), recordMessage);
writeRecordToCsv(gcsCsvWriter, recordMessage);
}
} else {
LOGGER.warn("Unexpected message: " + message.getType());
Expand All @@ -138,6 +139,24 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt));
}

protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) {
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();
final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData());
try {
gcsCsvWriter.csvPrinter.printRecord(
UUID.randomUUID().toString(),
formattedEmittedAt,
Jsons.serialize(formattedData)
);
} catch(IOException e) {
e.printStackTrace();
LOGGER.warn("An error occurred writing CSV file.");
}
}

@Override
public void close(final boolean hasFailed) {
LOGGER.info("Started closing all connections");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -69,7 +71,7 @@ static void createSchemaAndTableIfNeeded(final BigQuery bigquery,
createSchemaTable(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema);
BigQueryUtils.createPartitionedTable(bigquery, schemaName, tmpTableName, schema);
}

static void createSchemaTable(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
Expand All @@ -95,6 +97,32 @@ static void createTable(final BigQuery bigquery, final String datasetName, final
}
}

// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java
static void createPartitionedTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) {
try {

TableId tableId = TableId.of(datasetName, tableName);

TimePartitioning partitioning =
TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) // name of column to use for partitioning
.setExpirationMs(7776000000L) // 90 days
.build();

StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setSchema(schema)
.setTimePartitioning(partitioning)
.build();
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

bigquery.create(tableInfo);
LOGGER.info("Partitioned Table: {} created successfully", tableId);
} catch (BigQueryException e) {
LOGGER.info("Partitioned table was not created. \n" + e.toString());
}
}

public static JsonNode getGcsJsonNodeConfig(final JsonNode config) {
final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD);
final JsonNode gcsJsonNode = Jsons.jsonNode(ImmutableMap.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer {
private final CsvSheetGenerator csvSheetGenerator;
private final StreamTransferManager uploadManager;
private final MultiPartOutputStream outputStream;
private final CSVPrinter csvPrinter;
public final CSVPrinter csvPrinter;
private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type)

public GcsCsvWriter(final GcsDestinationConfig config,
Expand Down

0 comments on commit 0dbf378

Please sign in to comment.