Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations V2: move create raw tables earlier #28255

Merged
merged 3 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void startTracked() throws InterruptedException {
if (use1s1t) {
// TODO extract common logic with GCS record consumer + extract into a higher level class
// For each stream, make sure that its corresponding final table exists.
for (StreamConfig stream : catalog.streams()) {
for (final StreamConfig stream : catalog.streams()) {
final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
if (existingTable.isEmpty()) {
destinationHandler.execute(sqlGenerator.createTable(stream, ""));
Expand All @@ -111,9 +111,9 @@ protected void startTracked() throws InterruptedException {
}

// For streams in overwrite mode, truncate the raw table and create a tmp table.
// non-1s1t syncs actually overwrite the raw table at the end of of the sync, wo we only do this in
// non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in
// 1s1t mode.
for (StreamConfig stream : catalog.streams()) {
for (final StreamConfig stream : catalog.streams()) {
LOGGER.info("Stream {} has sync mode {}", stream.id(), stream.destinationSyncMode());
final String suffix = overwriteStreamsWithTmpTable.get(stream.id());
if (stream.destinationSyncMode() == DestinationSyncMode.OVERWRITE && suffix != null && !suffix.isEmpty()) {
Expand All @@ -126,6 +126,10 @@ protected void startTracked() throws InterruptedException {
destinationHandler.execute(sqlGenerator.createTable(stream, suffix));
}
}

uploaderMap.forEach((streamId, uploader) -> {
uploader.createRawTable();
});
}
}

Expand Down Expand Up @@ -203,7 +207,7 @@ public void close(final boolean hasFailed) {

private void doTypingAndDeduping(final StreamConfig stream) throws InterruptedException {
if (use1s1t) {
String suffix;
final String suffix;
suffix = overwriteStreamsWithTmpTable.getOrDefault(stream.id(), "");
final String sql = sqlGenerator.updateTable(suffix, stream);
destinationHandler.execute(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ protected void uploadData(final Consumer<AirbyteMessage> outputRecordCollector,
LOGGER.info("Uploading data from the tmp table {} to the source table {}.", tmpTable.getTable(), table.getTable());
uploadDataToTableFromTmpTable();
LOGGER.info("Data is successfully loaded to the source table {}!", table.getTable());
} else {
// Otherwise, we just need to ensure that this table exists.
// TODO alter an existing raw table?
final Table rawTable = bigQuery.getTable(table);
if (rawTable == null) {
bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(recordFormatter.getBigQuerySchema())).build());
}
}

outputRecordCollector.accept(lastStateMessage);
Expand All @@ -126,6 +119,18 @@ protected void uploadData(final Consumer<AirbyteMessage> outputRecordCollector,
}
}

public void createRawTable() {
// Ensure that this table exists.
// TODO alter an existing raw table?
final Table rawTable = bigQuery.getTable(table);
if (rawTable == null) {
LOGGER.info("Creating raw table {}.", table);
bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(recordFormatter.getBigQuerySchema())).build());
} else {
LOGGER.info("Found raw table {}.", rawTable.getTableId());
}
}

protected void dropTmpTable() {
try {
// clean up tmp tables;
Expand Down
Loading