Skip to content

Commit

Permalink
🐛 Destination snowflake: split the snowflake insert query into multip…
Browse files Browse the repository at this point in the history
…le queries with a specific batch size (#5783)

* fixed snowflake. split the snowflake insert query into multiple queries with a specific batch size

* fixed snowflake. split the snowflake insert query into multiple queries with a specific batch size

* fixed code style

* fixed snowflake destination code style

* fixed snowflake destination code style

* updated documentation and snowflake dockerImageTag

* updated documentation and snowflake dockerImageTag

* updated documentation

* updated snowflake s3 and gcs stream copiers and streamCopierFactories

* updated code style

* updated code style

* updated SnowflakeDatabase

* fixed remarks

* updated DatabricksStreamCopier
  • Loading branch information
andriikorotkov authored Sep 24, 2021
1 parent 41157f7 commit 3378ba4
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class DatabricksStreamCopier implements StreamCopier {
private final S3ParquetWriter parquetWriter;
private final String tmpTableLocation;
private final String destTableLocation;
private final String stagingFolder;

public DatabricksStreamCopier(String stagingFolder,
String schema,
Expand All @@ -98,6 +99,7 @@ public DatabricksStreamCopier(String stagingFolder,

this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.destTableName = nameTransformer.getIdentifier(streamName);
this.stagingFolder = stagingFolder;

S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder);
this.parquetWriter = (S3ParquetWriter) writerFactory.create(stagingS3Config, s3Client, configuredStream, uploadTime);
Expand All @@ -116,7 +118,12 @@ public DatabricksStreamCopier(String stagingFolder,
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
public String prepareStagingFile() {
return String.join("/", s3Config.getBucketPath(), stagingFolder);
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception {
parquetWriter.write(id, recordMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ private static RecordWriter recordWriterFunction(Map<AirbyteStreamNameNamespaceP
SqlOperations sqlOperations,
Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
return (AirbyteStreamNameNamespacePair pair, List<AirbyteRecordMessage> records) -> {
var fileName = pairToCopier.get(pair).prepareStagingFile();
for (AirbyteRecordMessage recordMessage : records) {
var id = UUID.randomUUID();
if (sqlOperations.isValidData(recordMessage.getData())) {
// TODO Truncate json data instead of throwing whole record away?
// or should we upload it into a special rejected record folder in s3 instead?
pairToCopier.get(pair).write(id, recordMessage);
pairToCopier.get(pair).write(id, recordMessage, fileName);
} else {
pairToIgnoredRecordCount.put(pair, pairToIgnoredRecordCount.getOrDefault(pair, 0L) + 1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface StreamCopier {
/**
* Writes a value to a staging file for the stream.
*/
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception;
void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception;

/**
* Closes the writer for the stream to the staging persistence. This method should block until all
Expand Down Expand Up @@ -78,4 +78,11 @@ public interface StreamCopier {
*/
void removeFileAndDropTmpTable() throws Exception;

/**
* Creates the staging file and all the necessary items to write data to this file.
*
* @return the name of the staging file
*/
String prepareStagingFile();

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
Expand All @@ -47,6 +48,9 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
Expand All @@ -57,18 +61,19 @@ public abstract class GcsStreamCopier implements StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class);

private final String gcsStagingFile;
private final Storage storageClient;
private final GcsConfig gcsConfig;
private final WriteChannel channel;
private final CSVPrinter csvPrinter;
private final String tmpTableName;
private final DestinationSyncMode destSyncMode;
private final String schemaName;
private final String streamName;
private final JdbcDatabase db;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final Set<String> gcsStagingFiles = new HashSet<>();
private final HashMap<String, WriteChannel> channels = new HashMap<>();
private final HashMap<String, CSVPrinter> csvPrinters = new HashMap<>();
private final String stagingFolder;

public GcsStreamCopier(String stagingFolder,
DestinationSyncMode destSyncMode,
Expand All @@ -82,59 +87,79 @@ public GcsStreamCopier(String stagingFolder,
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
this.stagingFolder = stagingFolder;
this.db = db;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.storageClient = storageClient;
this.gcsConfig = gcsConfig;
}

this.gcsStagingFile = String.join("/", stagingFolder, schemaName, streamName);
private String prepareGcsStagingFile() {
return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName);
}

var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile);
@Override
public String prepareStagingFile() {
var name = prepareGcsStagingFile();
gcsStagingFiles.add(name);
var blobId = BlobId.of(gcsConfig.getBucketName(), name);
var blobInfo = BlobInfo.newBuilder(blobId).build();
var blob = storageClient.create(blobInfo);
this.channel = blob.writer();
var channel = blob.writer();
channels.put(name, channel);
OutputStream outputStream = Channels.newOutputStream(channel);

var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
return name;
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
csvPrinter.printRecord(id,
Jsons.serialize(recordMessage.getData()),
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
public void write(UUID id, AirbyteRecordMessage recordMessage, String gcsFileName) throws Exception {
if (csvPrinters.containsKey(gcsFileName)) {
csvPrinters.get(gcsFileName).printRecord(id,
Jsons.serialize(recordMessage.getData()),
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
}
}

@Override
public void closeStagingUploader(boolean hasFailed) throws Exception {
LOGGER.info("Uploading remaining data for {} stream.", streamName);
csvPrinter.close();
channel.close();
for (var csvPrinter : csvPrinters.values()) {
csvPrinter.close();
}
for (var channel : channels.values()) {
channel.close();
}
LOGGER.info("All data for {} stream uploaded.", streamName);
}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName);
copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig);
for (var gcsStagingFile : gcsStagingFiles) {
copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig);
}
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}

@Override
public void removeFileAndDropTmpTable() throws Exception {
LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile);
var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile);
if (storageClient.get(blobId).exists()) {
storageClient.delete(blobId);
for (var gcsStagingFile : gcsStagingFiles) {
LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile);
var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile);
if (storageClient.get(blobId).exists()) {
storageClient.delete(blobId);
}
LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile);
}
LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile);

LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName);
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName);
Expand Down
Loading

0 comments on commit 3378ba4

Please sign in to comment.