diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 9844d684711c..6b780528d067 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -75,6 +75,7 @@ public class DatabricksStreamCopier implements StreamCopier { private final S3ParquetWriter parquetWriter; private final String tmpTableLocation; private final String destTableLocation; + private final String stagingFolder; public DatabricksStreamCopier(String stagingFolder, String schema, @@ -98,6 +99,7 @@ public DatabricksStreamCopier(String stagingFolder, this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.destTableName = nameTransformer.getIdentifier(streamName); + this.stagingFolder = stagingFolder; S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder); this.parquetWriter = (S3ParquetWriter) writerFactory.create(stagingS3Config, s3Client, configuredStream, uploadTime); @@ -116,7 +118,12 @@ public DatabricksStreamCopier(String stagingFolder, } @Override - public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + public String prepareStagingFile() { + return String.join("/", s3Config.getBucketPath(), stagingFolder); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception { parquetWriter.write(id, recordMessage); } diff --git a/airbyte-integrations/connectors/destination-jdbc/Dockerfile b/airbyte-integrations/connectors/destination-jdbc/Dockerfile index 612b4d004600..1de711a53ec7 100644 --- a/airbyte-integrations/connectors/destination-jdbc/Dockerfile +++ b/airbyte-integrations/connectors/destination-jdbc/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.4 +LABEL io.airbyte.version=0.3.5 LABEL io.airbyte.name=airbyte/destination-jdbc diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index a11d341b9144..7baa0874ff89 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -107,12 +107,13 @@ private static RecordWriter recordWriterFunction(Map pairToIgnoredRecordCount) { return (AirbyteStreamNameNamespacePair pair, List records) -> { + var fileName = pairToCopier.get(pair).prepareStagingFile(); for (AirbyteRecordMessage recordMessage : records) { var id = UUID.randomUUID(); if (sqlOperations.isValidData(recordMessage.getData())) { // TODO Truncate json data instead of throwing whole record away? // or should we upload it into a special rejected record folder in s3 instead? - pairToCopier.get(pair).write(id, recordMessage); + pairToCopier.get(pair).write(id, recordMessage, fileName); } else { pairToIgnoredRecordCount.put(pair, pairToIgnoredRecordCount.getOrDefault(pair, 0L) + 1L); } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java index 7af1a6910a10..74161778ef2f 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java @@ -36,7 +36,7 @@ public interface StreamCopier { /** * Writes a value to a staging file for the stream. */ - void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception; + void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception; /** * Closes the writer for the stream to the staging persistence. This method should block until all @@ -78,4 +78,11 @@ public interface StreamCopier { */ void removeFileAndDropTmpTable() throws Exception; + /** + * Creates the staging file and all the necessary items to write data to this file. + * + * @return the name of the staging file + */ + String prepareStagingFile(); + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index 69fe6dddff78..7981469f3f61 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -31,6 +31,7 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; @@ -47,6 +48,9 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -57,11 +61,8 @@ public abstract class GcsStreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class); - private final String gcsStagingFile; private final Storage storageClient; private final GcsConfig gcsConfig; - private final WriteChannel channel; - private final CSVPrinter csvPrinter; private final String tmpTableName; private final DestinationSyncMode destSyncMode; private final String schemaName; @@ -69,6 +70,10 @@ public abstract class GcsStreamCopier implements StreamCopier { private final JdbcDatabase db; private final ExtendedNameTransformer nameTransformer; private final SqlOperations sqlOperations; + private final Set gcsStagingFiles = new HashSet<>(); + private final HashMap channels = new HashMap<>(); + private final HashMap csvPrinters = new HashMap<>(); + private final String stagingFolder; public GcsStreamCopier(String stagingFolder, DestinationSyncMode destSyncMode, @@ -82,59 +87,79 @@ public GcsStreamCopier(String stagingFolder, this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; + this.stagingFolder = stagingFolder; this.db = db; this.nameTransformer = nameTransformer; this.sqlOperations = sqlOperations; this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.storageClient = storageClient; this.gcsConfig = gcsConfig; + } - this.gcsStagingFile = String.join("/", stagingFolder, schemaName, streamName); + private String prepareGcsStagingFile() { + return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName); + } - var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); + @Override + public String prepareStagingFile() { + var name = prepareGcsStagingFile(); + gcsStagingFiles.add(name); + var blobId = BlobId.of(gcsConfig.getBucketName(), name); var blobInfo = BlobInfo.newBuilder(blobId).build(); var blob = storageClient.create(blobInfo); - this.channel = blob.writer(); + var channel = blob.writer(); + channels.put(name, channel); OutputStream outputStream = Channels.newOutputStream(channel); var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); try { - this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); } catch (IOException e) { throw new RuntimeException(e); } + return name; } @Override - public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { - csvPrinter.printRecord(id, - Jsons.serialize(recordMessage.getData()), - Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + public void write(UUID id, AirbyteRecordMessage recordMessage, String gcsFileName) throws Exception { + if (csvPrinters.containsKey(gcsFileName)) { + csvPrinters.get(gcsFileName).printRecord(id, + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + } } @Override public void closeStagingUploader(boolean hasFailed) throws Exception { LOGGER.info("Uploading remaining data for {} stream.", streamName); - csvPrinter.close(); - channel.close(); + for (var csvPrinter : csvPrinters.values()) { + csvPrinter.close(); + } + for (var channel : channels.values()) { + channel.close(); + } LOGGER.info("All data for {} stream uploaded.", streamName); } @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName); - copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig); + for (var gcsStagingFile : gcsStagingFiles) { + copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig); + } LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } @Override public void removeFileAndDropTmpTable() throws Exception { - LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile); - var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); - if (storageClient.get(blobId).exists()) { - storageClient.delete(blobId); + for (var gcsStagingFile : gcsStagingFiles) { + LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile); + var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); + if (storageClient.get(blobId).exists()) { + storageClient.delete(blobId); + } + LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile); } - LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile); LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 4fb5ed30625a..b8481fc1e156 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -33,6 +33,8 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; @@ -45,6 +47,10 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -62,12 +68,8 @@ public abstract class S3StreamCopier implements StreamCopier { // WARNING: Too large a part size can cause potential OOM errors. public static final int DEFAULT_PART_SIZE_MB = 10; - private final String s3StagingFile; private final AmazonS3 s3Client; private final S3Config s3Config; - private final StreamTransferManager multipartUploadManager; - private final MultiPartOutputStream outputStream; - private final CSVPrinter csvPrinter; private final String tmpTableName; private final DestinationSyncMode destSyncMode; private final String schemaName; @@ -75,11 +77,18 @@ public abstract class S3StreamCopier implements StreamCopier { private final JdbcDatabase db; private final ExtendedNameTransformer nameTransformer; private final SqlOperations sqlOperations; + private final Set s3StagingFiles = new HashSet<>(); + private final Map multipartUploadManagers = new HashMap<>(); + private final Map outputStreams = new HashMap<>(); + private final Map csvPrinters = new HashMap<>(); + private final String s3FileName; + private final String stagingFolder; public S3StreamCopier(String stagingFolder, DestinationSyncMode destSyncMode, String schema, String streamName, + String s3FileName, AmazonS3 client, JdbcDatabase db, S3Config s3Config, @@ -88,14 +97,24 @@ public S3StreamCopier(String stagingFolder, this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; + this.s3FileName = s3FileName; + this.stagingFolder = stagingFolder; this.db = db; this.nameTransformer = nameTransformer; this.sqlOperations = sqlOperations; this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.s3Client = client; this.s3Config = s3Config; + } + + private String prepareS3StagingFile() { + return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + s3FileName); + } - this.s3StagingFile = prepareS3StagingFile(stagingFolder, streamName); + @Override + public String prepareStagingFile() { + var name = prepareS3StagingFile(); + s3StagingFiles.add(name); LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not // have support for streaming multipart uploads; @@ -104,75 +123,39 @@ public S3StreamCopier(String stagingFolder, // Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's // configured part size. // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. - this.multipartUploadManager = - new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) - .numUploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .partSize(s3Config.getPartSize()); + var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client) + .numUploadThreads(DEFAULT_UPLOAD_THREADS) + .queueCapacity(DEFAULT_QUEUE_CAPACITY) + .partSize(s3Config.getPartSize()); + multipartUploadManagers.put(name, manager); + var outputStream = manager.getMultiPartOutputStreams().get(0); // We only need one output stream as we only have one input stream. This is reasonably performant. // See the above comment. - this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); - - var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); - try { - this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public S3StreamCopier(String stagingFolder, - DestinationSyncMode destSyncMode, - String schema, - String streamName, - String s3FileName, - AmazonS3 client, - JdbcDatabase db, - S3Config s3Config, - ExtendedNameTransformer nameTransformer, - SqlOperations sqlOperations) { - this.destSyncMode = destSyncMode; - this.schemaName = schema; - this.streamName = streamName; - this.db = db; - this.nameTransformer = nameTransformer; - this.sqlOperations = sqlOperations; - this.tmpTableName = nameTransformer.getTmpTableName(streamName); - this.s3Client = client; - this.s3Config = s3Config; - - this.s3StagingFile = prepareS3StagingFile(stagingFolder, s3FileName); - LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); - this.multipartUploadManager = - new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) - .numUploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .partSize(s3Config.getPartSize()); - this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); - + outputStreams.put(name, outputStream); var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); try { - this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); } catch (IOException e) { throw new RuntimeException(e); } - } - - private String prepareS3StagingFile(String stagingFolder, String s3FileName) { - return String.join("/", stagingFolder, schemaName, s3FileName); + return name; } @Override - public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { - csvPrinter.printRecord(id, - Jsons.serialize(recordMessage.getData()), - Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + public void write(UUID id, AirbyteRecordMessage recordMessage, String s3FileName) throws Exception { + if (csvPrinters.containsKey(s3FileName)) { + csvPrinters.get(s3FileName).printRecord(id, + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + } } @Override public void closeStagingUploader(boolean hasFailed) throws Exception { if (hasFailed) { - multipartUploadManager.abort(); + for (var multipartUploadManager : multipartUploadManagers.values()) { + multipartUploadManager.abort(); + } } closeAndWaitForUpload(); } @@ -192,7 +175,9 @@ public void createTemporaryTable() throws Exception { @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); - copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config); + s3StagingFiles.forEach(s3StagingFile -> Exceptions.toRuntime(() -> { + copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config); + })); LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } @@ -220,11 +205,13 @@ public String generateMergeStatement(String destTableName) { @Override public void removeFileAndDropTmpTable() throws Exception { - LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile); - if (s3Client.doesObjectExist(s3Config.getBucketName(), s3StagingFile)) { - s3Client.deleteObject(s3Config.getBucketName(), s3StagingFile); - } - LOGGER.info("S3 staging file {} cleaned.", s3StagingFile); + s3StagingFiles.forEach(s3StagingFile -> { + LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile); + if (s3Client.doesObjectExist(s3Config.getBucketName(), s3StagingFile)) { + s3Client.deleteObject(s3Config.getBucketName(), s3StagingFile); + } + LOGGER.info("S3 staging file {} cleaned.", s3StagingFile); + }); LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); @@ -240,9 +227,15 @@ private static String getFullS3Path(String s3BucketName, String s3StagingFile) { */ private void closeAndWaitForUpload() throws IOException { LOGGER.info("Uploading remaining data for {} stream.", streamName); - csvPrinter.close(); - outputStream.close(); - multipartUploadManager.complete(); + for (var csvPrinter : csvPrinters.values()) { + csvPrinter.close(); + } + for (var outputStream : outputStreams.values()) { + outputStream.close(); + } + for (var multipartUploadManager : multipartUploadManagers.values()) { + multipartUploadManager.complete(); + } LOGGER.info("All data for {} stream uploaded.", streamName); }