From c283d9d159a9412f028362e6c0b9179d32b014a9 Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Mon, 20 Jun 2022 14:56:33 +0300 Subject: [PATCH] Deprecate PART_SIZE_MB in connectors using S3/GCS storage (#13753) * Removed part_size from connectors that use StreamTransferManager * fixed S3DestinationConfigTest * fixed S3JsonlFormatConfigTest * upadate changelog and bump version * auto-bump connector version * auto-bump connector version * auto-bump connector version * auto-bump connector version * upadate changelog and bump version for Redshift and Snowflake destinations * auto-bump connector version * fix GCS staging test * fix GCS staging test * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 12 +- .../resources/seed/destination_specs.yaml | 107 ++---------------- .../src/main/resources/spec.json | 12 +- ...ormalizedGcsDestinationAcceptanceTest.java | 1 - .../destination/bigquery/BigQueryConsts.java | 1 - .../destination/bigquery/BigQueryUtils.java | 6 +- .../src/main/resources/spec.json | 12 +- .../BigQueryDestinationAcceptanceTest.java | 2 +- .../BigQueryGcsDestinationAcceptanceTest.java | 1 - .../bigquery/BigQueryTestDataComparator.java | 1 + .../connectors/destination-gcs/Dockerfile | 2 +- .../destination/gcs/GcsDestinationConfig.java | 1 - .../destination/gcs/avro/GcsAvroWriter.java | 4 +- .../destination/gcs/csv/GcsCsvWriter.java | 4 +- .../destination/gcs/jsonl/GcsJsonlWriter.java | 1 - .../src/main/resources/spec.json | 21 ---- .../gcs/avro/GcsAvroFormatConfigTest.java | 12 +- .../gcs/csv/GcsCsvFormatConfigTest.java | 12 +- .../gcs/jsonl/GcsJsonlFormatConfigTest.java | 12 +- .../jdbc/copy/s3/S3StreamCopier.java | 3 +- .../jdbc/copy/s3/S3StreamCopierTest.java | 4 +- .../destination-redshift/Dockerfile | 2 +- .../redshift/RedshiftDestination.java | 17 +-- .../RedshiftStagingS3Destination.java | 4 +- .../RedshiftDestinationConstants.java | 8 +- .../redshift/validator/RedshiftUtil.java | 8 +- .../src/main/resources/spec.json | 17 ++- ...dshiftInsertDestinationAcceptanceTest.java | 2 - .../redshift/RedshiftDestinationTest.java | 1 + .../copiers/RedshiftStreamCopierTest.java | 2 - .../connectors/destination-s3/Dockerfile | 2 +- .../destination/s3/S3DestinationConfig.java | 23 +--- .../s3/S3DestinationConstants.java | 5 - .../destination/s3/S3FormatConfig.java | 2 - .../destination/s3/S3StorageOperations.java | 2 +- .../s3/avro/S3AvroFormatConfig.java | 14 +-- .../destination/s3/avro/S3AvroWriter.java | 1 - .../destination/s3/csv/S3CsvFormatConfig.java | 16 +-- .../destination/s3/csv/S3CsvWriter.java | 1 - .../s3/jsonl/S3JsonlFormatConfig.java | 15 +-- .../destination/s3/jsonl/S3JsonlWriter.java | 1 - .../s3/parquet/S3ParquetFormatConfig.java | 7 -- .../s3/S3DestinationConfigTest.java | 3 - .../s3/avro/S3AvroFormatConfigTest.java | 12 +- .../s3/csv/S3CsvFormatConfigTest.java | 11 +- .../destination/s3/csv/S3CsvWriterTest.java | 6 +- .../s3/jsonl/S3JsonlFormatConfigTest.java | 12 +- .../destination-snowflake/Dockerfile | 2 +- .../SnowflakeGcsStagingSqlOperations.java | 11 +- .../src/main/resources/spec.json | 12 +- .../SnowflakeS3StreamCopierTest.java | 3 - .../src/test/resources/copy_s3_config.json | 3 +- .../resources/copy_s3_encrypted_config.json | 1 - docs/integrations/destinations/bigquery.md | 6 +- docs/integrations/destinations/gcs.md | 1 + docs/integrations/destinations/redshift.md | 1 + docs/integrations/destinations/s3.md | 1 + docs/integrations/destinations/snowflake.md | 1 + 58 files changed, 118 insertions(+), 349 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index d0a18f0cbb63..eb1a0cb131f8 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -27,7 +27,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 1.1.8 + dockerImageTag: 1.1.9 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: @@ -40,7 +40,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 1.1.8 + dockerImageTag: 1.1.9 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: @@ -100,7 +100,7 @@ - name: Google Cloud Storage (GCS) destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a dockerRepository: airbyte/destination-gcs - dockerImageTag: 0.2.7 + dockerImageTag: 0.2.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs icon: googlecloudstorage.svg resourceRequirements: @@ -225,7 +225,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.39 + dockerImageTag: 0.3.40 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg resourceRequirements: @@ -244,7 +244,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.3.7 + dockerImageTag: 0.3.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg resourceRequirements: @@ -264,7 +264,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.28 + dockerImageTag: 0.4.29 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 5134c2cd9bf1..fda39c9d11e4 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -285,7 +285,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-bigquery:1.1.8" +- dockerImage: "airbyte/destination-bigquery:1.1.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -438,19 +438,6 @@ examples: - "data_sync/test" order: 3 - part_size_mb: - title: "Block Size (MB) for GCS Multipart Upload (Optional)" - description: "This is the size of a \"Part\" being buffered in memory.\ - \ It limits the memory usage when writing. Larger values will allow\ - \ to upload a bigger files and improve the speed, but consumes more\ - \ memory. Allowed values: min=5MB, max=525MB Default: 5MB." - type: "integer" - default: 5 - minimum: 5 - maximum: 525 - examples: - - 5 - order: 4 keep_files_in_gcs-bucket: type: "string" description: "This upload method is supposed to temporary store records\ @@ -462,7 +449,7 @@ enum: - "Delete all tmp files from GCS" - "Keep all tmp files in GCS" - order: 5 + order: 4 credentials_json: type: "string" description: "The contents of the JSON service account key. Check out the\ @@ -510,7 +497,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.8" +- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -627,19 +614,6 @@ examples: - "data_sync/test" order: 3 - part_size_mb: - title: "Block Size (MB) for GCS Multipart Upload (Optional)" - description: "This is the size of a \"Part\" being buffered in memory.\ - \ It limits the memory usage when writing. Larger values will allow\ - \ to upload a bigger files and improve the speed, but consumes more\ - \ memory. Allowed values: min=5MB, max=525MB Default: 5MB." - type: "integer" - default: 5 - minimum: 5 - maximum: 525 - examples: - - 5 - order: 4 keep_files_in_gcs-bucket: type: "string" description: "This upload method is supposed to temporary store records\ @@ -651,7 +625,7 @@ enum: - "Delete all tmp files from GCS" - "Keep all tmp files in GCS" - order: 5 + order: 4 credentials_json: type: "string" description: "The contents of the JSON service account key. Check out the\ @@ -1486,7 +1460,7 @@ - "overwrite" - "append" supportsNamespaces: true -- dockerImage: "airbyte/destination-gcs:0.2.7" +- dockerImage: "airbyte/destination-gcs:0.2.8" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs" connectionSpecification: @@ -1720,16 +1694,6 @@ enum: - "snappy" default: "snappy" - part_size_mb: - title: "Block Size (MB) for GCS multipart upload (Optional)" - description: "This is the size of a \"Part\" being buffered in memory.\ - \ It limits the memory usage when writing. Larger values will allow\ - \ to upload a bigger files and improve the speed, but consumes9\ - \ more memory. Allowed values: min=5MB, max=525MB Default: 5MB." - type: "integer" - default: 5 - examples: - - 5 - title: "CSV: Comma-Separated Values" required: - "format_type" @@ -1748,16 +1712,6 @@ enum: - "No flattening" - "Root level flattening" - part_size_mb: - title: "Block Size (MB) for GCS multipart upload (Optional)" - description: "This is the size of a \"Part\" being buffered in memory.\ - \ It limits the memory usage when writing. Larger values will allow\ - \ to upload a bigger files and improve the speed, but consumes9\ - \ more memory. Allowed values: min=5MB, max=525MB Default: 5MB." - type: "integer" - default: 5 - examples: - - 5 compression: title: "Compression" type: "object" @@ -1792,16 +1746,6 @@ enum: - "JSONL" default: "JSONL" - part_size_mb: - title: "Block Size (MB) for GCS multipart upload (Optional)" - description: "This is the size of a \"Part\" being buffered in memory.\ - \ It limits the memory usage when writing. Larger values will allow\ - \ to upload a bigger files and improve the speed, but consumes9\ - \ more memory. Allowed values: min=5MB, max=525MB Default: 5MB." - type: "integer" - default: 5 - examples: - - 5 compression: title: "Compression" type: "object" @@ -3678,7 +3622,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.39" +- dockerImage: "airbyte/destination-redshift:0.3.40" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: @@ -3822,22 +3766,6 @@ \ key." title: "S3 Access Key" airbyte_secret: true - part_size: - type: "integer" - minimum: 10 - maximum: 100 - examples: - - "10" - description: "Increase this if syncing tables larger than 100GB. Only\ - \ relevant for COPY. Files are streamed to S3 in parts. This determines\ - \ the size of each part, in MBs. As S3 has a limit of 10,000 parts\ - \ per file, part size affects the table size. This is 10MB by default,\ - \ resulting in a default limit of 100GB tables. Note: a larger part\ - \ size will result in larger memory requirements. A rule of thumb\ - \ is to multiply the part size by 10 to get the memory requirement.\ - \ Modify this with care. See docs for details." - title: "Stream Part Size (Optional)" purge_staging_data: title: "Purge Staging Files and Tables (Optional)" type: "boolean" @@ -3895,7 +3823,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-s3:0.3.7" +- dockerImage: "airbyte/destination-s3:0.3.8" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3" connectionSpecification: @@ -4314,7 +4242,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.28" +- dockerImage: "airbyte/destination-snowflake:0.4.29" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake" connectionSpecification: @@ -4546,21 +4474,6 @@ title: "S3 Access Key" airbyte_secret: true order: 4 - part_size: - type: "integer" - default: 5 - examples: - - 5 - description: "Optional. Increase this if syncing tables larger than\ - \ 100GB. Only relevant for COPY. Files are streamed to S3 in parts.\ - \ This determines the size of each part, in MBs. As S3 has a limit\ - \ of 10,000 parts per file, part size affects the table size. This\ - \ is 10MB by default, resulting in a default limit of 100GB tables.\ - \ Note, a larger part size will result in larger memory requirements.\ - \ A rule of thumb is to multiply the part size by 10 to get the\ - \ memory requirement. Modify this with care." - title: "Stream Part Size" - order: 5 purge_staging_data: title: "Purge Staging Files and Tables" type: "boolean" @@ -4568,14 +4481,14 @@ \ the sync. See the docs for details. Only relevant for COPY. Defaults\ \ to true." default: true - order: 6 + order: 5 encryption: title: "Encryption" type: "object" description: "How to encrypt the staging data" default: encryption_type: "none" - order: 7 + order: 6 oneOf: - title: "No encryption" description: "Staging data will be stored in plaintext." diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json index fb64ce159db0..27c08f29e810 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json @@ -112,16 +112,6 @@ "examples": ["data_sync/test"], "order": 3 }, - "part_size_mb": { - "title": "Block Size (MB) for GCS Multipart Upload (Optional)", - "description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.", - "type": "integer", - "default": 5, - "minimum": 5, - "maximum": 525, - "examples": [5], - "order": 4 - }, "keep_files_in_gcs-bucket": { "type": "string", "description": "This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.", @@ -131,7 +121,7 @@ "Delete all tmp files from GCS", "Keep all tmp files in GCS" ], - "order": 5 + "order": 4 } } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java index 8bb59272fca0..23233ad2543f 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java @@ -37,7 +37,6 @@ protected JsonNode createConfig() throws IOException { .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) .put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME)) .put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) - .put(BigQueryConsts.PART_SIZE, gcsConfigFromSecretFile.get(BigQueryConsts.PART_SIZE)) .put(BigQueryConsts.CREDENTIAL, credential) .build()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java index 3669d7680d89..016c8365ad30 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java @@ -23,7 +23,6 @@ public class BigQueryConsts { public static final String FORMAT = "format"; public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket"; public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS"; - public static final String PART_SIZE = "part_size_mb"; public static final String NAMESPACE_PREFIX = "n"; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 3426c9719722..6ae70cd99629 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -143,8 +143,7 @@ public static JsonNode getGcsJsonNodeConfig(final JsonNode config) { .put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL)) .put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n" + " \"format_type\": \"CSV\",\n" - + " \"flattening\": \"No flattening\",\n" - + " \"part_size_mb\": \"" + loadingMethod.get(BigQueryConsts.PART_SIZE) + "\"\n" + + " \"flattening\": \"No flattening\"\n" + "}")) .build()); @@ -165,8 +164,7 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) { .put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL)) .put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n" + " \"format_type\": \"AVRO\",\n" - + " \"flattening\": \"No flattening\",\n" - + " \"part_size_mb\": \"" + loadingMethod.get(BigQueryConsts.PART_SIZE) + "\"\n" + + " \"flattening\": \"No flattening\"\n" + "}")) .build()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json index d26e17dd7ce8..b806ea40d4a3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json @@ -149,16 +149,6 @@ "examples": ["data_sync/test"], "order": 3 }, - "part_size_mb": { - "title": "Block Size (MB) for GCS Multipart Upload (Optional)", - "description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.", - "type": "integer", - "default": 5, - "minimum": 5, - "maximum": 525, - "examples": [5], - "order": 4 - }, "keep_files_in_gcs-bucket": { "type": "string", "description": "This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.", @@ -168,7 +158,7 @@ "Delete all tmp files from GCS", "Keep all tmp files in GCS" ], - "order": 5 + "order": 4 } } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index 61f9be7d225e..edda58225624 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -181,7 +181,7 @@ private List retrieveRecordsFromTable(final String tableName, final St final FieldList fields = queryResults.getSchema().getFields(); BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); - return Streams.stream(queryResults.iterateAll()) + return Streams.stream(queryResults.iterateAll()) .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java index 3fbffdc388b8..9226bec91b69 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java @@ -45,7 +45,6 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) .put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME)) .put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) - .put(BigQueryConsts.PART_SIZE, gcsConfigFromSecretFile.get(BigQueryConsts.PART_SIZE)) .put(BigQueryConsts.CREDENTIAL, credential) .build()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java index 8c7be65f6fad..392d0687142f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java @@ -107,4 +107,5 @@ protected void compareObjects(JsonNode expectedObject, JsonNode actualObject) { JsonNode actualJsonNode = (actualObject.isTextual() ? Jsons.deserialize(actualObject.textValue()) : actualObject); super.compareObjects(expectedObject, actualJsonNode); } + } diff --git a/airbyte-integrations/connectors/destination-gcs/Dockerfile b/airbyte-integrations/connectors/destination-gcs/Dockerfile index 4a234d7c6827..a486f174e069 100644 --- a/airbyte-integrations/connectors/destination-gcs/Dockerfile +++ b/airbyte-integrations/connectors/destination-gcs/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.7 +LABEL io.airbyte.version=0.2.8 LABEL io.airbyte.name=airbyte/destination-gcs diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java index ba070797edd9..16ddb90ce146 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java @@ -39,7 +39,6 @@ public GcsDestinationConfig(final String bucketName, bucketRegion, S3DestinationConstants.DEFAULT_PATH_FORMAT, credentialConfig.getS3CredentialConfig().orElseThrow(), - S3DestinationConstants.DEFAULT_PART_SIZE_MB, formatConfig, null); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java index 4771910da809..592e1a74ce35 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.gcs.avro; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; + import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; @@ -76,7 +78,7 @@ public GcsAvroWriter(final GcsDestinationConfig config, this.avroRecordFactory = new AvroRecordFactory(schema, converter); this.uploadManager = StreamTransferManagerFactory .create(config.getBucketName(), objectKey, s3Client) - .setPartSize(config.getFormatConfig().getPartSize()) + .setPartSize((long) DEFAULT_PART_SIZE_MB) .get(); // We only need one output stream as we only have one input stream. This is reasonably performant. this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java index e8104e1fc2f8..1a50b4636485 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.gcs.csv; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; + import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; @@ -58,7 +60,7 @@ public GcsCsvWriter(final GcsDestinationConfig config, this.uploadManager = StreamTransferManagerFactory .create(config.getBucketName(), objectKey, s3Client) - .setPartSize(config.getFormatConfig().getPartSize()) + .setPartSize((long) DEFAULT_PART_SIZE_MB) .get(); // We only need one output stream as we only have one input stream. This is reasonably performant. this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java index 7590c39d8ce1..5a930f267309 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java @@ -54,7 +54,6 @@ public GcsJsonlWriter(final GcsDestinationConfig config, this.uploadManager = StreamTransferManagerFactory .create(config.getBucketName(), objectKey, s3Client) - .setPartSize(config.getFormatConfig().getPartSize()) .get(); // We only need one output stream as we only have one input stream. This is reasonably performant. diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json index b0d566f0eda9..1273b661e731 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json @@ -226,13 +226,6 @@ } } ] - }, - "part_size_mb": { - "title": "Block Size (MB) for GCS multipart upload (Optional)", - "description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.", - "type": "integer", - "default": 5, - "examples": [5] } } }, @@ -252,13 +245,6 @@ "default": "No flattening", "enum": ["No flattening", "Root level flattening"] }, - "part_size_mb": { - "title": "Block Size (MB) for GCS multipart upload (Optional)", - "description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.", - "type": "integer", - "default": 5, - "examples": [5] - }, "compression": { "title": "Compression", "type": "object", @@ -299,13 +285,6 @@ "enum": ["JSONL"], "default": "JSONL" }, - "part_size_mb": { - "title": "Block Size (MB) for GCS multipart upload (Optional)", - "description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.", - "type": "integer", - "default": 5, - "examples": [5] - }, "compression": { "title": "Compression", "type": "object", diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.java b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.java index 5fc6a590b378..c2f0aa5c1791 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.gcs.avro; import static com.amazonaws.services.s3.internal.Constants.MB; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; import static org.junit.jupiter.api.Assertions.assertEquals; import alex.mojaki.s3upload.StreamTransferManager; @@ -13,7 +14,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.util.ConfigTestUtils; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; @@ -104,8 +104,7 @@ public void testParseCodecConfigInvalid() { public void testHandlePartSizeConfig() throws IllegalAccessException { final JsonNode config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("{\n" - + " \"format_type\": \"AVRO\",\n" - + " \"part_size_mb\": 6\n" + + " \"format_type\": \"AVRO\"\n" + "}")); final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig @@ -114,15 +113,13 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final S3FormatConfig formatConfig = gcsDestinationConfig.getFormatConfig(); assertEquals("AVRO", formatConfig.getFormat().name()); - assertEquals(6, formatConfig.getPartSize()); // Assert that is set properly in config final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(gcsDestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * 6, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test @@ -138,11 +135,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException { final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(gcsDestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.java b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.java index 6df74ec8ca2c..56b948967fe1 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.gcs.csv; import static com.amazonaws.services.s3.internal.Constants.MB; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -13,7 +14,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.util.ConfigTestUtils; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; @@ -41,8 +41,7 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final JsonNode config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("{\n" + " \"format_type\": \"CSV\",\n" - + " \"flattening\": \"Root level flattening\",\n" - + " \"part_size_mb\": 6\n" + + " \"flattening\": \"Root level flattening\"\n" + "}")); final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config); @@ -50,15 +49,13 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final S3FormatConfig formatConfig = gcsDestinationConfig.getFormatConfig(); assertEquals("CSV", formatConfig.getFormat().name()); - assertEquals(6, formatConfig.getPartSize()); // Assert that is set properly in config final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(gcsDestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * 6, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test @@ -74,11 +71,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException { final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(gcsDestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.java b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.java index aa89beeb318a..8b8ddbb08a24 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.gcs.jsonl; import static com.amazonaws.services.s3.internal.Constants.MB; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; import static org.junit.jupiter.api.Assertions.assertEquals; import alex.mojaki.s3upload.StreamTransferManager; @@ -12,7 +13,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.util.ConfigTestUtils; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; import org.apache.commons.lang3.reflect.FieldUtils; @@ -26,8 +26,7 @@ public class GcsJsonlFormatConfigTest { public void testHandlePartSizeConfig() throws IllegalAccessException { final JsonNode config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("{\n" - + " \"format_type\": \"JSONL\",\n" - + " \"part_size_mb\": 6\n" + + " \"format_type\": \"JSONL\"\n" + "}")); final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig @@ -36,16 +35,14 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final S3FormatConfig formatConfig = gcsDestinationConfig.getFormatConfig(); assertEquals("JSONL", formatConfig.getFormat().name()); - assertEquals(6, formatConfig.getPartSize()); // Assert that is set properly in config final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(gcsDestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * 6, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test @@ -61,11 +58,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException { final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(gcsDestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index a4d317fb09f8..de4b55347423 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -94,12 +94,11 @@ public S3StreamCopier(final String stagingFolder, @Override public String prepareStagingFile() { if (partsAddedToCurrentFile == 0) { - LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); try { // The Flattening value is actually ignored, because we pass an explicit CsvSheetGenerator. So just // pass in null. - final S3FormatConfig csvFormatConfig = new S3CsvFormatConfig(null, (long) s3Config.getPartSize(), CompressionType.NO_COMPRESSION); + final S3FormatConfig csvFormatConfig = new S3CsvFormatConfig(null, CompressionType.NO_COMPRESSION); final S3DestinationConfig writerS3Config = S3DestinationConfig.create(s3Config).withFormatConfig(csvFormatConfig).get(); final S3CsvWriter writer = new S3CsvWriter.Builder( writerS3Config, diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java index 5344366bd828..a0f487d59018 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java @@ -43,14 +43,12 @@ public class S3StreamCopierTest { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamCopierTest.class); - private static final int PART_SIZE = 5; private static final S3DestinationConfig S3_CONFIG = S3DestinationConfig.create( "fake-bucket", "fake-bucketPath", "fake-region") .withEndpoint("fake-endpoint") .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .withPartSize(PART_SIZE) .get(); private static final ConfiguredAirbyteStream CONFIGURED_STREAM = new ConfiguredAirbyteStream() .withDestinationSyncMode(DestinationSyncMode.APPEND) @@ -178,7 +176,7 @@ public void createSequentialStagingFiles_when_multipleFilesRequested() { private void checkCsvWriterArgs(final S3CsvWriterArguments args) { final S3DestinationConfig s3Config = S3DestinationConfig.create(S3_CONFIG) - .withFormatConfig(new S3CsvFormatConfig(null, (long) PART_SIZE, CompressionType.NO_COMPRESSION)) + .withFormatConfig(new S3CsvFormatConfig(null, CompressionType.NO_COMPRESSION)) .get(); assertEquals(s3Config, args.config); assertEquals(CONFIGURED_STREAM, args.stream); diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 3e1528f9eaff..be77e3561248 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.39 +LABEL io.airbyte.version=0.3.40 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index c52884b61c94..310a24c98656 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -4,8 +4,8 @@ package io.airbyte.integrations.destination.redshift; -import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.findS3Options; import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.anyOfS3FieldsAreNullOrEmpty; +import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.findS3Options; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.base.Destination; @@ -16,10 +16,13 @@ import org.slf4j.LoggerFactory; /** - * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL Insert statement. Although less efficient, this requires less user set up. See {@link - * RedshiftInsertDestination} for more detail. The second inserts via streaming the data to an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for production - * workloads, but does require users to set up an S3 bucket and pass in additional credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given arguments to - * determine which strategy to use. + * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL + * Insert statement. Although less efficient, this requires less user set up. See + * {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to + * an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for + * production workloads, but does require users to set up an S3 bucket and pass in additional + * credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the + * given arguments to determine which strategy to use. */ public class RedshiftDestination extends SwitchingDestination { @@ -28,8 +31,7 @@ public class RedshiftDestination extends SwitchingDestination destinationMap = Map.of( DestinationType.STANDARD, new RedshiftInsertDestination(), - DestinationType.COPY_S3, new RedshiftStagingS3Destination() - ); + DestinationType.COPY_S3, new RedshiftStagingS3Destination()); enum DestinationType { STANDARD, @@ -62,4 +64,5 @@ public static void main(final String[] args) throws Exception { new IntegrationRunner(destination).run(args); LOGGER.info("completed destination: {}", RedshiftDestination.class); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 91609c5019dc..d36817b4ea7d 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -105,8 +105,8 @@ public JsonNode toJdbcConfig(final JsonNode config) { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); return new StagingConsumerFactory().create( outputRecordCollector, diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java index 9fbe512f0acc..15d473c29e3d 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.redshift.constants; /** @@ -5,8 +9,8 @@ */ public class RedshiftDestinationConstants { - private RedshiftDestinationConstants() { - } + private RedshiftDestinationConstants() {} public static final String UPLOADING_METHOD = "uploading_method"; + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java index 29f52847e1d5..78d7c5d81be0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.redshift.validator; import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD; @@ -9,8 +13,7 @@ */ public class RedshiftUtil { - private RedshiftUtil() { - } + private RedshiftUtil() {} // We check whether config located in root of node. (This check is done for Backward compatibility) public static JsonNode findS3Options(final JsonNode config) { @@ -27,4 +30,5 @@ && isNullOrEmpty(jsonNode.get("access_key_id")) private static boolean isNullOrEmpty(final JsonNode jsonNode) { return null == jsonNode || "".equals(jsonNode.asText()); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index d70c27665cc7..3dd90f72d04b 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -67,7 +67,13 @@ { "title": "S3 Staging", "additionalProperties": false, - "required": ["method", "s3_bucket_name", "s3_bucket_region", "access_key_id", "secret_access_key"], + "required": [ + "method", + "s3_bucket_name", + "s3_bucket_region", + "access_key_id", + "secret_access_key" + ], "properties": { "method": { "type": "string", @@ -129,14 +135,6 @@ "title": "S3 Access Key", "airbyte_secret": true }, - "part_size": { - "type": "integer", - "minimum": 10, - "maximum": 100, - "examples": ["10"], - "description": "Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note: a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. See docs for details.", - "title": "Stream Part Size (Optional)" - }, "purge_staging_data": { "title": "Purge Staging Files and Tables (Optional)", "type": "boolean", @@ -150,4 +148,3 @@ } } } - diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java index c699438ce8b5..80a53948a483 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java @@ -8,10 +8,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java index 700e8c7d0f37..bfc1f2897ee8 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java @@ -58,4 +58,5 @@ public void useS3StagingBackwardCompatibility() { s3StagingStub.put("secret_access_key", "test key"); assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(s3StagingStub)); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java index c6eca7829607..6681540b0425 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java @@ -39,7 +39,6 @@ class RedshiftStreamCopierTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStreamCopierTest.class); - private static final int PART_SIZE = 5; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // The full path would be something like @@ -71,7 +70,6 @@ public void setup() { "fake-region") .withEndpoint("fake-endpoint") .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .withPartSize(PART_SIZE) .get(); copier = new RedshiftStreamCopier( diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index 9400c975836b..0a09fb3112fc 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.7 +LABEL io.airbyte.version=0.3.8 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 7c8044706f5d..34bfed0eb745 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -20,8 +20,7 @@ /** * An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more - * delicate data syncing to S3) and {@code partSize} (used by certain bulk-load database - * operations). + * delicate data syncing to S3) */ public class S3DestinationConfig { @@ -33,8 +32,6 @@ public class S3DestinationConfig { private final String bucketRegion; private final String pathFormat; private final S3CredentialConfig credentialConfig; - @Deprecated - private final Integer partSize; private final S3FormatConfig formatConfig; private final Object lock = new Object(); @@ -46,7 +43,6 @@ public S3DestinationConfig(final String endpoint, final String bucketRegion, final String pathFormat, final S3CredentialConfig credentialConfig, - final Integer partSize, final S3FormatConfig formatConfig, final AmazonS3 s3Client) { this.endpoint = endpoint; @@ -56,7 +52,6 @@ public S3DestinationConfig(final String endpoint, this.pathFormat = pathFormat; this.credentialConfig = credentialConfig; this.formatConfig = formatConfig; - this.partSize = partSize; this.s3Client = s3Client; } @@ -68,7 +63,6 @@ public static Builder create(final S3DestinationConfig config) { return new Builder(config.getBucketName(), config.getBucketPath(), config.getBucketRegion()) .withEndpoint(config.getEndpoint()) .withCredentialConfig(config.getS3CredentialConfig()) - .withPartSize(config.getPartSize()) .withFormatConfig(config.getFormatConfig()); } @@ -90,10 +84,6 @@ public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) builder = builder.withEndpoint(config.get("s3_endpoint").asText()); } - if (config.has("part_size")) { - builder = builder.withPartSize(config.get("part_size").asInt()); - } - final S3CredentialConfig credentialConfig; if (config.has("access_key_id")) { credentialConfig = new S3AccessKeyCredentialConfig(config.get("access_key_id").asText(), config.get("secret_access_key").asText()); @@ -135,10 +125,6 @@ public S3CredentialConfig getS3CredentialConfig() { return credentialConfig; } - public Integer getPartSize() { - return partSize; - } - public S3FormatConfig getFormatConfig() { return formatConfig; } @@ -217,7 +203,6 @@ public static class Builder { private String endpoint = ""; private String pathFormat = S3DestinationConstants.DEFAULT_PATH_FORMAT; - private int partSize = S3DestinationConstants.DEFAULT_PART_SIZE_MB; private String bucketName; private String bucketPath; @@ -257,11 +242,6 @@ public Builder withEndpoint(final String endpoint) { return this; } - public Builder withPartSize(final int partSize) { - this.partSize = partSize; - return this; - } - public Builder withFormatConfig(final S3FormatConfig formatConfig) { this.formatConfig = formatConfig; return this; @@ -290,7 +270,6 @@ public S3DestinationConfig get() { bucketRegion, pathFormat, credentialConfig, - partSize, formatConfig, s3Client); } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java index c3b9013acb53..89641d9357ad 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java @@ -11,11 +11,6 @@ public final class S3DestinationConstants { public static final String YYYY_MM_DD_FORMAT_STRING = "yyyy_MM_dd"; public static final S3NameTransformer NAME_TRANSFORMER = new S3NameTransformer(); - public static final String PART_SIZE_MB_ARG_NAME = "part_size_mb"; - // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives - // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. - // WARNING: Too large a part size can cause potential OOM errors. - public static final int DEFAULT_PART_SIZE_MB = 10; public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"; // gzip compression for CSV and JSONL diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3FormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3FormatConfig.java index 88e1b124d16c..77856bdcec2a 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3FormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3FormatConfig.java @@ -10,8 +10,6 @@ public interface S3FormatConfig { S3Format getFormat(); - Long getPartSize(); - String getFileExtension(); static String withDefault(final JsonNode config, final String property, final String defaultValue) { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index 4b62a4ed3a8f..59a9fa92a9ea 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -133,7 +133,7 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData, * @return the uploaded filename, which is different from the serialized buffer filename */ private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException { - final long partSize = s3Config.getFormatConfig() != null ? s3Config.getFormatConfig().getPartSize() : DEFAULT_PART_SIZE; + final long partSize = DEFAULT_PART_SIZE; final String bucket = s3Config.getBucketName(); final String fullObjectKey = objectPath + getPartId(objectPath) + getExtension(recordsData.getFilename()); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java index 3f8aae7ed646..2a086a32b717 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java @@ -4,10 +4,7 @@ package io.airbyte.integrations.destination.s3.avro; -import static io.airbyte.integrations.destination.s3.S3DestinationConstants.PART_SIZE_MB_ARG_NAME; - import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.S3FormatConfig; import org.apache.avro.file.CodecFactory; @@ -17,18 +14,13 @@ public class S3AvroFormatConfig implements S3FormatConfig { public static final String DEFAULT_SUFFIX = ".avro"; private final CodecFactory codecFactory; - private final Long partSize; - public S3AvroFormatConfig(final CodecFactory codecFactory, final long partSize) { + public S3AvroFormatConfig(final CodecFactory codecFactory) { this.codecFactory = codecFactory; - this.partSize = partSize; } public S3AvroFormatConfig(final JsonNode formatConfig) { this.codecFactory = parseCodecConfig(formatConfig.get("compression_codec")); - this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null - ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() - : S3DestinationConstants.DEFAULT_PART_SIZE_MB; } public static CodecFactory parseCodecConfig(final JsonNode compressionCodecConfig) { @@ -96,10 +88,6 @@ public CodecFactory getCodecFactory() { return codecFactory; } - public Long getPartSize() { - return partSize; - } - @Override public String getFileExtension() { return DEFAULT_SUFFIX; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java index 8bd3676a1474..9eece89ed9f8 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java @@ -56,7 +56,6 @@ public S3AvroWriter(final S3DestinationConfig config, this.avroRecordFactory = new AvroRecordFactory(schema, converter); this.uploadManager = StreamTransferManagerFactory .create(config.getBucketName(), objectKey, s3Client) - .setPartSize(config.getFormatConfig().getPartSize()) .get(); // We only need one output stream as we only have one input stream. This is reasonably performant. this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java index 4ca449379109..d6ff3c132146 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java @@ -6,11 +6,9 @@ import static io.airbyte.integrations.destination.s3.S3DestinationConstants.COMPRESSION_ARG_NAME; import static io.airbyte.integrations.destination.s3.S3DestinationConstants.DEFAULT_COMPRESSION_TYPE; -import static io.airbyte.integrations.destination.s3.S3DestinationConstants.PART_SIZE_MB_ARG_NAME; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.util.CompressionType; @@ -50,24 +48,18 @@ public String getValue() { } private final Flattening flattening; - @Deprecated - private final Long partSize; private final CompressionType compressionType; public S3CsvFormatConfig(final JsonNode formatConfig) { this( Flattening.fromValue(formatConfig.has("flattening") ? formatConfig.get("flattening").asText() : Flattening.NO.value), - formatConfig.has(PART_SIZE_MB_ARG_NAME) - ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() - : S3DestinationConstants.DEFAULT_PART_SIZE_MB, formatConfig.has(COMPRESSION_ARG_NAME) ? CompressionTypeHelper.parseCompressionType(formatConfig.get(COMPRESSION_ARG_NAME)) : DEFAULT_COMPRESSION_TYPE); } - public S3CsvFormatConfig(final Flattening flattening, final Long partSize, final CompressionType compressionType) { + public S3CsvFormatConfig(final Flattening flattening, final CompressionType compressionType) { this.flattening = flattening; - this.partSize = partSize; this.compressionType = compressionType; } @@ -80,11 +72,6 @@ public Flattening getFlattening() { return flattening; } - @Override - public Long getPartSize() { - return partSize; - } - @Override public String getFileExtension() { return CSV_SUFFIX + compressionType.getFileExtension(); @@ -98,7 +85,6 @@ public CompressionType getCompressionType() { public String toString() { return "S3CsvFormatConfig{" + "flattening=" + flattening + - ", partSize=" + partSize + ", compression=" + compressionType.name() + '}'; } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java index 15ace28740b7..cce2da71e33f 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java @@ -61,7 +61,6 @@ private S3CsvWriter(final S3DestinationConfig config, this.uploadManager = StreamTransferManagerFactory .create(config.getBucketName(), objectKey, s3Client) - .setPartSize(config.getFormatConfig().getPartSize()) .get() .numUploadThreads(uploadThreads) .queueCapacity(queueCapacity); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java index 93c10dc677c2..3904da3d8de1 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java @@ -6,10 +6,8 @@ import static io.airbyte.integrations.destination.s3.S3DestinationConstants.COMPRESSION_ARG_NAME; import static io.airbyte.integrations.destination.s3.S3DestinationConstants.DEFAULT_COMPRESSION_TYPE; -import static io.airbyte.integrations.destination.s3.S3DestinationConstants.PART_SIZE_MB_ARG_NAME; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.util.CompressionType; @@ -20,13 +18,9 @@ public class S3JsonlFormatConfig implements S3FormatConfig { public static final String JSONL_SUFFIX = ".jsonl"; - private final Long partSize; private final CompressionType compressionType; public S3JsonlFormatConfig(final JsonNode formatConfig) { - this.partSize = formatConfig.has(PART_SIZE_MB_ARG_NAME) - ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() - : S3DestinationConstants.DEFAULT_PART_SIZE_MB; this.compressionType = formatConfig.has(COMPRESSION_ARG_NAME) ? CompressionTypeHelper.parseCompressionType(formatConfig.get(COMPRESSION_ARG_NAME)) : DEFAULT_COMPRESSION_TYPE; @@ -37,10 +31,6 @@ public S3Format getFormat() { return S3Format.JSONL; } - public Long getPartSize() { - return partSize; - } - @Override public String getFileExtension() { return JSONL_SUFFIX + compressionType.getFileExtension(); @@ -53,7 +43,6 @@ public CompressionType getCompressionType() { @Override public String toString() { return "S3JsonlFormatConfig{" + - ", partSize=" + partSize + ", compression=" + compressionType.name() + '}'; } @@ -67,12 +56,12 @@ public boolean equals(final Object o) { return false; } final S3JsonlFormatConfig that = (S3JsonlFormatConfig) o; - return Objects.equals(partSize, that.partSize) && Objects.equals(compressionType, that.compressionType); + return Objects.equals(compressionType, that.compressionType); } @Override public int hashCode() { - return Objects.hash(partSize, compressionType); + return Objects.hash(compressionType); } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java index 9d8e79a06e12..b415100a4e77 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java @@ -54,7 +54,6 @@ public S3JsonlWriter(final S3DestinationConfig config, this.uploadManager = StreamTransferManagerFactory .create(config.getBucketName(), objectKey, s3Client) - .setPartSize(config.getFormatConfig().getPartSize()) .get(); // We only need one output stream as we only have one input stream. This is reasonably performant. this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetFormatConfig.java index 88e389e65d9b..77cf6656a54d 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetFormatConfig.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.s3.parquet; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.S3FormatConfig; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -42,12 +41,6 @@ public S3Format getFormat() { return S3Format.PARQUET; } - @Override - public Long getPartSize() { - // not applicable for Parquet format - return Integer.toUnsignedLong(S3DestinationConstants.DEFAULT_PART_SIZE_MB); - } - @Override public String getFileExtension() { return PARQUET_SUFFIX; diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java index e81900c78683..c802b16db64b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java @@ -13,7 +13,6 @@ class S3DestinationConfigTest { private static final S3DestinationConfig CONFIG = S3DestinationConfig.create("test-bucket", "test-path", "test-region") .withEndpoint("test-endpoint") - .withPartSize(19) .withPathFormat("${STREAM_NAME}/${NAMESPACE}") .withAccessKeyCredential("test-key", "test-secret") .get(); @@ -29,7 +28,6 @@ public void testCreateAndModify() { final String newBucketPath = "new-path"; final String newBucketRegion = "new-region"; final String newEndpoint = "new-endpoint"; - final int newPartSize = 29; final String newKey = "new-key"; final String newSecret = "new-secret"; @@ -39,7 +37,6 @@ public void testCreateAndModify() { .withBucketRegion(newBucketRegion) .withEndpoint(newEndpoint) .withAccessKeyCredential(newKey, newSecret) - .withPartSize(newPartSize) .get(); assertNotEquals(CONFIG, modifiedConfig); diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfigTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfigTest.java index 8dccdac4391b..496eb5280f47 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfigTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfigTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3.avro; import static com.amazonaws.services.s3.internal.Constants.MB; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -13,7 +14,6 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.util.ConfigTestUtils; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; @@ -106,8 +106,7 @@ public void testParseCodecConfigInvalid() { public void testHandlePartSizeConfig() throws IllegalAccessException { final JsonNode config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("{\n" - + " \"format_type\": \"AVRO\",\n" - + " \"part_size_mb\": 6\n" + + " \"format_type\": \"AVRO\"\n" + "}")); final S3DestinationConfig s3DestinationConfig = S3DestinationConfig @@ -116,15 +115,13 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final S3FormatConfig formatConfig = s3DestinationConfig.getFormatConfig(); assertEquals("AVRO", formatConfig.getFormat().name()); - assertEquals(6, formatConfig.getPartSize()); // Assert that is set properly in config final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(s3DestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(s3DestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * 6, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test @@ -140,11 +137,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException { final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(s3DestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(s3DestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfigTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfigTest.java index 76df86a146f6..f087d4d01316 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfigTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfigTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3.csv; import static com.amazonaws.services.s3.internal.Constants.MB; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -42,8 +43,7 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final JsonNode config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("{\n" + " \"format_type\": \"CSV\",\n" - + " \"flattening\": \"Root level flattening\",\n" - + " \"part_size_mb\": 6\n" + + " \"flattening\": \"Root level flattening\"\n" + "}")); final S3DestinationConfig s3DestinationConfig = S3DestinationConfig @@ -52,15 +52,13 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final S3FormatConfig formatConfig = s3DestinationConfig.getFormatConfig(); assertEquals("CSV", formatConfig.getFormat().name()); - assertEquals(6, formatConfig.getPartSize()); // Assert that is set properly in config final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(s3DestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(s3DestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * 6, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test @@ -77,11 +75,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException { final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(s3DestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(s3DestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java index 56e3a59b8db3..42a6ee1ebade 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java @@ -53,8 +53,7 @@ class S3CsvWriterTest { .withNamespace("fake-namespace")); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final int PART_SIZE = 7; - private static final S3CsvFormatConfig CSV_FORMAT_CONFIG = new S3CsvFormatConfig(Flattening.NO, (long) PART_SIZE, CompressionType.NO_COMPRESSION); + private static final S3CsvFormatConfig CSV_FORMAT_CONFIG = new S3CsvFormatConfig(Flattening.NO, CompressionType.NO_COMPRESSION); private static final S3DestinationConfig CONFIG = S3DestinationConfig.create( "fake-bucket", @@ -62,7 +61,6 @@ class S3CsvWriterTest { "fake-region") .withEndpoint("fake-endpoint") .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .withPartSize(PART_SIZE) .withFormatConfig(CSV_FORMAT_CONFIG) .get(); @@ -162,7 +160,6 @@ public void createsExactlyOneUpload() throws IOException { final StreamTransferManager manager = streamTransferManagerMockedConstruction.constructed().get(0); final StreamTransferManagerArguments args = streamTransferManagerConstructorArguments.get(0); - verify(manager).partSize(PART_SIZE); verify(manager).numUploadThreads(UPLOAD_THREADS); verify(manager).queueCapacity(QUEUE_CAPACITY); assertEquals("fake-bucket", args.bucket); @@ -255,7 +252,6 @@ public void writesContentsCorrectly_when_stagingDatabaseConfig() throws IOExcept "fake-region") .withEndpoint("fake-endpoint") .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .withPartSize(PART_SIZE) .withFormatConfig(CSV_FORMAT_CONFIG) .get(); final S3CsvWriter writer = new Builder( diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfigTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfigTest.java index 9e092b114d72..3a9c97199097 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfigTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfigTest.java @@ -5,13 +5,13 @@ package io.airbyte.integrations.destination.s3.jsonl; import static com.amazonaws.services.s3.internal.Constants.MB; +import static io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB; import static org.junit.jupiter.api.Assertions.assertEquals; import alex.mojaki.s3upload.StreamTransferManager; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.util.ConfigTestUtils; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; @@ -26,8 +26,7 @@ public class S3JsonlFormatConfigTest { public void testHandlePartSizeConfig() throws IllegalAccessException { final JsonNode config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("{\n" - + " \"format_type\": \"JSONL\",\n" - + " \"part_size_mb\": 6\n" + + " \"format_type\": \"JSONL\"\n" + "}")); final S3DestinationConfig s3DestinationConfig = S3DestinationConfig @@ -36,16 +35,14 @@ public void testHandlePartSizeConfig() throws IllegalAccessException { final S3FormatConfig formatConfig = s3DestinationConfig.getFormatConfig(); assertEquals("JSONL", formatConfig.getFormat().name()); - assertEquals(6, formatConfig.getPartSize()); // Assert that is set properly in config final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(s3DestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(s3DestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * 6, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } @Test @@ -61,11 +58,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException { final StreamTransferManager streamTransferManager = StreamTransferManagerFactory .create(s3DestinationConfig.getBucketName(), "objectKey", null) - .setPartSize(s3DestinationConfig.getFormatConfig().getPartSize()) .get(); final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true); - assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes); + assertEquals(MB * DEFAULT_PART_SIZE_MB, partSizeBytes); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index dc1c4a0ba295..c424da73c199 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.28 +LABEL io.airbyte.version=0.4.29 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingSqlOperations.java index b8e0fe522ec4..1933ebc299b6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingSqlOperations.java @@ -28,10 +28,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; + import org.joda.time.DateTime; public class SnowflakeGcsStagingSqlOperations extends SnowflakeSqlOperations implements StagingOperations { @@ -190,7 +192,14 @@ public void dropStageIfExists(JdbcDatabase database, String stageName) throws Ex private void dropBucketObject() { if (!fullObjectKeys.isEmpty()) { - fullObjectKeys.forEach(this::removeBlob); + Iterator iterator = fullObjectKeys.iterator(); + while (iterator.hasNext()) { + String element = iterator.next(); + if (element != null) { + removeBlob(element); + iterator.remove(); + } + } } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index f02f6172349e..51b77c80de96 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -233,27 +233,19 @@ "airbyte_secret": true, "order": 4 }, - "part_size": { - "type": "integer", - "default": 5, - "examples": [5], - "description": "Optional. Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", - "title": "Stream Part Size", - "order": 5 - }, "purge_staging_data": { "title": "Purge Staging Files and Tables", "type": "boolean", "description": "Whether to delete the staging files from S3 after completing the sync. See the docs for details. Only relevant for COPY. Defaults to true.", "default": true, - "order": 6 + "order": 5 }, "encryption": { "title": "Encryption", "type": "object", "description": "How to encrypt the staging data", "default": { "encryption_type": "none" }, - "order": 7, + "order": 6, "oneOf": [ { "title": "No encryption", diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java index 9e5e555a6a2a..a899e7562d60 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java @@ -30,8 +30,6 @@ class SnowflakeS3StreamCopierTest { - private static final int PART_SIZE = 5; - // equivalent to Thu, 09 Dec 2021 19:17:54 GMT private static final Timestamp UPLOAD_TIME = Timestamp.from(Instant.ofEpochMilli(1639077474000L)); @@ -52,7 +50,6 @@ public void setup() throws Exception { "fake-region") .withEndpoint("fake-endpoint") .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .withPartSize(PART_SIZE) .get(); copier = (SnowflakeS3StreamCopier) new SnowflakeS3StreamCopierFactory().create( diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_config.json b/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_config.json index cd982b0f2805..bf55f9a2fd92 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_config.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_config.json @@ -13,7 +13,6 @@ "s3_bucket_name": "airbyte-snowflake-integration-tests", "s3_bucket_region": "us-east-2", "access_key_id": "test", - "secret_access_key": "test", - "part_size": 5 + "secret_access_key": "test" } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_encrypted_config.json b/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_encrypted_config.json index da8a8cbe1927..e0c5e3b62344 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_encrypted_config.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/resources/copy_s3_encrypted_config.json @@ -14,7 +14,6 @@ "s3_bucket_region": "us-east-2", "access_key_id": "test", "secret_access_key": "test", - "part_size": 5, "encryption": { "encryption_type": "aes_cbc_envelope" } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 0ce737895f55..ff98080fe7be 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -133,7 +133,8 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------| -| 1.1.9 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors | +| 1.1.10 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors | +| 1.1.9 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 1.1.8 | 2022-06-07 | [13579](https://github.com/airbytehq/airbyte/pull/13579) | Always check GCS bucket for GCS loading method to catch invalid HMAC keys. | | 1.1.7 | 2022-06-07 | [13424](https://github.com/airbytehq/airbyte/pull/13424) | Reordered fields for specification. | | 1.1.6 | 2022-05-15 | [12768](https://github.com/airbytehq/airbyte/pull/12768) | Clarify that the service account key json field is required on cloud. | @@ -172,7 +173,8 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------| -| 1.1.9 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors | +| 1.1.10 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors | +| 1.1.9 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 1.1.8 | 2022-06-07 | [13579](https://github.com/airbytehq/airbyte/pull/13579) | Always check GCS bucket for GCS loading method to catch invalid HMAC keys. | | 1.1.7 | 2022-06-07 | [13424](https://github.com/airbytehq/airbyte/pull/13424) | Reordered fields for specification. | | 1.1.6 | 2022-05-15 | [12768](https://github.com/airbytehq/airbyte/pull/12768) | Clarify that the service account key json field is required on cloud. | diff --git a/docs/integrations/destinations/gcs.md b/docs/integrations/destinations/gcs.md index d77d42dc776b..4041d4ffbbe4 100644 --- a/docs/integrations/destinations/gcs.md +++ b/docs/integrations/destinations/gcs.md @@ -235,6 +235,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.2.8 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 0.2.7 | 2022-06-14 | [\#13483](https://github.com/airbytehq/airbyte/pull/13483) | Added support for int, long, float data types to Avro/Parquet formats. | | 0.2.6 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | | 0.2.5 | 2022-05-04 | [\#12578](https://github.com/airbytehq/airbyte/pull/12578) | In JSON to Avro conversion, log JSON field values that do not follow Avro schema for debugging. | diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index b53603ffb341..cb50da71e6c2 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | | 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | | 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index cda1e5dfc1eb..8227b61f69b0 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -315,6 +315,7 @@ In order for everything to work correctly, it is also necessary that the user wh | Version | Date | Pull Request | Subject | |:--------| :--- | :--- |:---------------------------------------------------------------------------------------------------------------------------| +| 0.3.8 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 0.3.7 | 2022-06-14 | [\#13483](https://github.com/airbytehq/airbyte/pull/13483) | Added support for int, long, float data types to Avro/Parquet formats. | | 0.3.6 | 2022-05-19 | [\#13043](https://github.com/airbytehq/airbyte/pull/13043) | Destination S3: Remove configurable part size. | | 0.3.5 | 2022-05-12 | [\#12797](https://github.com/airbytehq/airbyte/pull/12797) | Update spec to replace markdown. | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index f3d3ede66f32..399d4f48d748 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -231,6 +231,7 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.4.29 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 0.4.28 | 2022-05-18 | [\#12952](https://github.com/airbytehq/airbyte/pull/12952) | Apply buffering strategy on GCS staging | | 0.4.27 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | | 0.4.26 | 2022-05-12 | [\#12805](https://github.com/airbytehq/airbyte/pull/12805) | Updated to latest base-java to emit AirbyteTraceMessages on error. |