diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 6b0047f07c18..a6937ef3381b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -225,7 +225,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.40 + dockerImageTag: 0.3.41 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index dcd20018abfd..45bfc705947a 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3622,7 +3622,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.40" +- dockerImage: "airbyte/destination-redshift:0.3.41" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: @@ -3773,6 +3773,33 @@ \ the sync. See docs for details." default: true + encryption: + title: "Encryption" + description: "How to encrypt the staging data" + oneOf: + - title: "No encryption" + description: "Staging data will be stored in plaintext." + type: "object" + required: + "encryption_type" + properties: + encryption_type: + type: "string" + const: "none" + - title: "AES-CBC envelope encryption", + description: "Staging data will be encrypted using AES-CBC envelope encryption." + type: "object" + required: + "encryption_type" + properties: + encryption_type: + type: "string" + const: "aes_cbc_envelope" + key_encrypting_key: + type: "string" + title: "Key" + description: "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.", + airbyte_secret: true supportsIncremental: true supportsNormalization: true supportsDBT: true diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index be77e3561248..b1f97f43ef25 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.40 +LABEL io.airbyte.version=0.3.41 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index d36817b4ea7d..89ef29bd9a42 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -25,12 +25,17 @@ import io.airbyte.integrations.destination.record_buffer.FileBuffer; import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType; +import io.airbyte.integrations.destination.s3.EncryptionConfig; +import io.airbyte.integrations.destination.s3.NoEncryption; import io.airbyte.integrations.destination.s3.S3Destination; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3StorageOperations; import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer; import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.Map; @@ -47,14 +52,26 @@ public RedshiftStagingS3Destination() { super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } + private boolean isEphemeralKeysAndPurgingStagingData(JsonNode config, EncryptionConfig encryptionConfig) { + return !isPurgeStagingData(config) && encryptionConfig instanceof AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL; + } + @Override public AirbyteConnectionStatus check(final JsonNode config) { final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); + final EncryptionConfig encryptionConfig = config.has("uploading_method") ? + EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption(); + if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) { + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage( + "You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt."); + } S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, ""); final NamingConventionTransformer nameTransformer = getNamingResolver(); final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations = - new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config); + new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig); final DataSource dataSource = getDataSource(config); try { final JdbcDatabase database = new DefaultJdbcDatabase(dataSource); @@ -108,10 +125,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); + final EncryptionConfig encryptionConfig = config.has("uploading_method") ? + EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption(); return new StagingConsumerFactory().create( outputRecordCollector, getDatabase(getDataSource(config)), - new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config), + new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig), getNamingResolver(), CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)), config, diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java index 6312810e8ea3..494ee50ff56a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java @@ -13,10 +13,15 @@ import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.redshift.manifest.Entry; import io.airbyte.integrations.destination.redshift.manifest.Manifest; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator; +import io.airbyte.integrations.destination.s3.EncryptionConfig; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3StorageOperations; import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import io.airbyte.integrations.destination.staging.StagingOperations; +import java.util.Base64; +import java.util.Base64.Encoder; import java.util.List; import java.util.Map; import java.util.Optional; @@ -26,18 +31,27 @@ public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations { + private static final Encoder BASE64_ENCODER = Base64.getEncoder(); private final NamingConventionTransformer nameTransformer; private final S3StorageOperations s3StorageOperations; private final S3DestinationConfig s3Config; private final ObjectMapper objectMapper; + private final byte[] keyEncryptingKey; public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer, AmazonS3 s3Client, - S3DestinationConfig s3Config) { + S3DestinationConfig s3Config, + final EncryptionConfig encryptionConfig) { this.nameTransformer = nameTransformer; this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config); this.s3Config = s3Config; this.objectMapper = new ObjectMapper(); + if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) { + this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key())); + this.keyEncryptingKey = e.key(); + } else { + this.keyEncryptingKey = null; + } } @Override @@ -99,10 +113,18 @@ public void copyIntoTmpTableFromStage(JdbcDatabase database, private void executeCopy(final String manifestPath, JdbcDatabase db, String schemaName, String tmpTableName) { final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); + final String encryptionClause; + if (keyEncryptingKey == null) { + encryptionClause = ""; + } else { + encryptionClause = String.format(" encryption = (type = 'aws_cse' master_key = '%s')", BASE64_ENCODER.encodeToString(keyEncryptingKey)); + } + final var copyQuery = String.format( """ COPY %s.%s FROM '%s' CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s' + %s CSV GZIP REGION '%s' TIMEFORMAT 'auto' STATUPDATE OFF @@ -112,6 +134,7 @@ private void executeCopy(final String manifestPath, JdbcDatabase db, String sche getFullS3Path(s3Config.getBucketName(), manifestPath), credentialConfig.getAccessKeyId(), credentialConfig.getSecretAccessKey(), + encryptionClause, s3Config.getBucketRegion()); Exceptions.toRuntime(() -> db.execute(copyQuery)); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 3dd90f72d04b..e444de5bdb8d 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -140,6 +140,49 @@ "type": "boolean", "description": "Whether to delete the staging files from S3 after completing the sync. See docs for details.", "default": true + }, + "encryption": { + "title": "Encryption", + "type": "object", + "description": "How to encrypt the staging data", + "default": { "encryption_type": "none" }, + "order": 7, + "oneOf": [ + { + "title": "No encryption", + "description": "Staging data will be stored in plaintext.", + "type": "object", + "required": ["encryption_type"], + "properties": { + "encryption_type": { + "type": "string", + "const": "none", + "enum": ["none"], + "default": "none" + } + } + }, + { + "title": "AES-CBC envelope encryption", + "description": "Staging data will be encrypted using AES-CBC envelope encryption.", + "type": "object", + "required": ["encryption_type"], + "properties": { + "encryption_type": { + "type": "string", + "const": "aes_cbc_envelope", + "enum": ["aes_cbc_envelope"], + "default": "aes_cbc_envelope" + }, + "key_encrypting_key": { + "type": "string", + "title": "Key", + "description": "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.", + "airbyte_secret": true + } + } + } + ] } } } diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index cb50da71e6c2..9d11aac7ec3d 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.41 | 2022-06-21 | [\#13675(https://github.com/airbytehq/airbyte/pull/13675) | Add an option to use encryption with staging in Redshift Destination | | 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | | 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | | 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |