Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination snowflake: split the snowflake insert query into multiple queries with a specific batch size #5783

Merged
merged 23 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
bf8615a
fixed snowflake. split the snowflake insert query into multiple queri…
andriikorotkov Sep 1, 2021
d9eac10
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 1, 2021
13cca44
fixed snowflake. split the snowflake insert query into multiple queri…
andriikorotkov Sep 1, 2021
cc61a8d
fixed code style
andriikorotkov Sep 1, 2021
cc4133a
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 1, 2021
81a282a
fixed snowflake destination code style
andriikorotkov Sep 1, 2021
c1d1a17
fixed snowflake destination code style
andriikorotkov Sep 1, 2021
54c11e8
updated documentation and snowflake dockerImageTag
andriikorotkov Sep 1, 2021
5509439
updated documentation and snowflake dockerImageTag
andriikorotkov Sep 1, 2021
faf05e3
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 3, 2021
de34252
updated documentation
andriikorotkov Sep 3, 2021
08d5e08
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 9, 2021
4ea0631
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 13, 2021
0e0924a
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 13, 2021
332144e
updated snowflake s3 and gcs stream copiers and streamCopierFactories
andriikorotkov Sep 17, 2021
810b46f
updated code style
andriikorotkov Sep 17, 2021
9dfb63f
updated code style
andriikorotkov Sep 17, 2021
47358c0
updated SnowflakeDatabase
andriikorotkov Sep 17, 2021
1eca4b0
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 20, 2021
a574596
fixed remarks
andriikorotkov Sep 21, 2021
eefb956
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 22, 2021
3c83807
updated DatabricksStreamCopier
andriikorotkov Sep 22, 2021
6b056eb
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Sep 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class DatabricksStreamCopier implements StreamCopier {
private final S3ParquetWriter parquetWriter;
private final String tmpTableLocation;
private final String destTableLocation;
private final String stagingFolder;

public DatabricksStreamCopier(String stagingFolder,
String schema,
Expand All @@ -98,6 +99,7 @@ public DatabricksStreamCopier(String stagingFolder,

this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.destTableName = nameTransformer.getIdentifier(streamName);
this.stagingFolder = stagingFolder;

S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder);
this.parquetWriter = (S3ParquetWriter) writerFactory.create(stagingS3Config, s3Client, configuredStream, uploadTime);
Expand All @@ -116,7 +118,12 @@ public DatabricksStreamCopier(String stagingFolder,
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
public String prepareStagingFile() {
return String.join("/", s3Config.getBucketPath(), stagingFolder);
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception {
parquetWriter.write(id, recordMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ private static RecordWriter recordWriterFunction(Map<AirbyteStreamNameNamespaceP
SqlOperations sqlOperations,
Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
return (AirbyteStreamNameNamespacePair pair, List<AirbyteRecordMessage> records) -> {
var fileName = pairToCopier.get(pair).prepareStagingFile();
for (AirbyteRecordMessage recordMessage : records) {
var id = UUID.randomUUID();
if (sqlOperations.isValidData(recordMessage.getData())) {
// TODO Truncate json data instead of throwing whole record away?
// or should we upload it into a special rejected record folder in s3 instead?
pairToCopier.get(pair).write(id, recordMessage);
pairToCopier.get(pair).write(id, recordMessage, fileName);
} else {
pairToIgnoredRecordCount.put(pair, pairToIgnoredRecordCount.getOrDefault(pair, 0L) + 1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface StreamCopier {
/**
* Writes a value to a staging file for the stream.
*/
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception;
void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception;
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved

/**
* Closes the writer for the stream to the staging persistence. This method should block until all
Expand Down Expand Up @@ -78,4 +78,11 @@ public interface StreamCopier {
*/
void removeFileAndDropTmpTable() throws Exception;

/**
* Creates the staging file and all the necessary items to write data to this file.
*
* @return the name of the staging file
*/
String prepareStagingFile();

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
Expand All @@ -47,6 +48,9 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
Expand All @@ -57,18 +61,19 @@ public abstract class GcsStreamCopier implements StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class);

private final String gcsStagingFile;
private final Storage storageClient;
private final GcsConfig gcsConfig;
private final WriteChannel channel;
private final CSVPrinter csvPrinter;
private final String tmpTableName;
private final DestinationSyncMode destSyncMode;
private final String schemaName;
private final String streamName;
private final JdbcDatabase db;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final Set<String> gcsStagingFiles = new HashSet<>();
private final HashMap<String, WriteChannel> channels = new HashMap<>();
private final HashMap<String, CSVPrinter> csvPrinters = new HashMap<>();
private final String stagingFolder;

public GcsStreamCopier(String stagingFolder,
DestinationSyncMode destSyncMode,
Expand All @@ -82,59 +87,79 @@ public GcsStreamCopier(String stagingFolder,
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
this.stagingFolder = stagingFolder;
this.db = db;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.storageClient = storageClient;
this.gcsConfig = gcsConfig;
}

this.gcsStagingFile = String.join("/", stagingFolder, schemaName, streamName);
private String prepareGcsStagingFile() {
return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName);
}

var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile);
@Override
public String prepareStagingFile() {
var name = prepareGcsStagingFile();
gcsStagingFiles.add(name);
var blobId = BlobId.of(gcsConfig.getBucketName(), name);
var blobInfo = BlobInfo.newBuilder(blobId).build();
var blob = storageClient.create(blobInfo);
this.channel = blob.writer();
var channel = blob.writer();
channels.put(name, channel);
OutputStream outputStream = Channels.newOutputStream(channel);

var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
return name;
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
csvPrinter.printRecord(id,
Jsons.serialize(recordMessage.getData()),
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
public void write(UUID id, AirbyteRecordMessage recordMessage, String gcsFileName) throws Exception {
if (csvPrinters.containsKey(gcsFileName)) {
csvPrinters.get(gcsFileName).printRecord(id,
Jsons.serialize(recordMessage.getData()),
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
}
}

@Override
public void closeStagingUploader(boolean hasFailed) throws Exception {
LOGGER.info("Uploading remaining data for {} stream.", streamName);
csvPrinter.close();
channel.close();
for (var csvPrinter : csvPrinters.values()) {
csvPrinter.close();
}
for (var channel : channels.values()) {
channel.close();
}
LOGGER.info("All data for {} stream uploaded.", streamName);
}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName);
copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig);
for (var gcsStagingFile : gcsStagingFiles) {
copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig);
}
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}

@Override
public void removeFileAndDropTmpTable() throws Exception {
LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile);
var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile);
if (storageClient.get(blobId).exists()) {
storageClient.delete(blobId);
for (var gcsStagingFile : gcsStagingFiles) {
LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile);
var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile);
if (storageClient.get(blobId).exists()) {
storageClient.delete(blobId);
}
LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile);
}
LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile);

LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName);
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName);
Expand Down
Loading