Skip to content

Commit

Permalink
12708: Add an option to use encryption with staging in Redshift Desti…
Browse files Browse the repository at this point in the history
…nation (airbytehq#13675)

* 12708: Add an option to use encryption with staging in Redshift Destination

* 12708: docs/docker configs updated

* 12708: merge with master

* 12708: merge fix

* 12708: code review implementation

* 12708: fix for older configs

* 12708: fix for older configs in check

* 12708: merge from master (consolidation issue)

* 12708: versions updated
  • Loading branch information
kimerinn authored Jun 22, 2022
1 parent dd2d5d0 commit aa28d44
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.40
dockerImageTag: 0.3.41
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3622,7 +3622,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.40"
- dockerImage: "airbyte/destination-redshift:0.3.41"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3773,6 +3773,33 @@
\ the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"\
> docs</a> for details."
default: true
encryption:
title: "Encryption"
description: "How to encrypt the staging data"
oneOf:
- title: "No encryption"
description: "Staging data will be stored in plaintext."
type: "object"
required:
"encryption_type"
properties:
encryption_type:
type: "string"
const: "none"
- title: "AES-CBC envelope encryption",
description: "Staging data will be encrypted using AES-CBC envelope encryption."
type: "object"
required:
"encryption_type"
properties:
encryption_type:
type: "string"
const: "aes_cbc_envelope"
key_encrypting_key:
type: "string"
title: "Key"
description: "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
airbyte_secret: true
supportsIncremental: true
supportsNormalization: true
supportsDBT: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.40
LABEL io.airbyte.version=0.3.41
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.NoEncryption;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
Expand All @@ -47,14 +52,26 @@ public RedshiftStagingS3Destination() {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}

private boolean isEphemeralKeysAndPurgingStagingData(JsonNode config, EncryptionConfig encryptionConfig) {
return !isPurgeStagingData(config) && encryptionConfig instanceof AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL;
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(
"You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt.");
}
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");

final NamingConventionTransformer nameTransformer = getNamingResolver();
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config);
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig);
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
Expand Down Expand Up @@ -108,10 +125,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
import io.airbyte.integrations.destination.staging.StagingOperations;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,18 +31,27 @@

public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {

private static final Encoder BASE64_ENCODER = Base64.getEncoder();
private final NamingConventionTransformer nameTransformer;
private final S3StorageOperations s3StorageOperations;
private final S3DestinationConfig s3Config;
private final ObjectMapper objectMapper;
private final byte[] keyEncryptingKey;

public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer,
AmazonS3 s3Client,
S3DestinationConfig s3Config) {
S3DestinationConfig s3Config,
final EncryptionConfig encryptionConfig) {
this.nameTransformer = nameTransformer;
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
this.s3Config = s3Config;
this.objectMapper = new ObjectMapper();
if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) {
this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key()));
this.keyEncryptingKey = e.key();
} else {
this.keyEncryptingKey = null;
}
}

@Override
Expand Down Expand Up @@ -99,10 +113,18 @@ public void copyIntoTmpTableFromStage(JdbcDatabase database,

private void executeCopy(final String manifestPath, JdbcDatabase db, String schemaName, String tmpTableName) {
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig();
final String encryptionClause;
if (keyEncryptingKey == null) {
encryptionClause = "";
} else {
encryptionClause = String.format(" encryption = (type = 'aws_cse' master_key = '%s')", BASE64_ENCODER.encodeToString(keyEncryptingKey));
}

final var copyQuery = String.format(
"""
COPY %s.%s FROM '%s'
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'
%s
CSV GZIP
REGION '%s' TIMEFORMAT 'auto'
STATUPDATE OFF
Expand All @@ -112,6 +134,7 @@ private void executeCopy(final String manifestPath, JdbcDatabase db, String sche
getFullS3Path(s3Config.getBucketName(), manifestPath),
credentialConfig.getAccessKeyId(),
credentialConfig.getSecretAccessKey(),
encryptionClause,
s3Config.getBucketRegion());

Exceptions.toRuntime(() -> db.execute(copyQuery));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,49 @@
"type": "boolean",
"description": "Whether to delete the staging files from S3 after completing the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"> docs</a> for details.",
"default": true
},
"encryption": {
"title": "Encryption",
"type": "object",
"description": "How to encrypt the staging data",
"default": { "encryption_type": "none" },
"order": 7,
"oneOf": [
{
"title": "No encryption",
"description": "Staging data will be stored in plaintext.",
"type": "object",
"required": ["encryption_type"],
"properties": {
"encryption_type": {
"type": "string",
"const": "none",
"enum": ["none"],
"default": "none"
}
}
},
{
"title": "AES-CBC envelope encryption",
"description": "Staging data will be encrypted using AES-CBC envelope encryption.",
"type": "object",
"required": ["encryption_type"],
"properties": {
"encryption_type": {
"type": "string",
"const": "aes_cbc_envelope",
"enum": ["aes_cbc_envelope"],
"default": "aes_cbc_envelope"
},
"key_encrypting_key": {
"type": "string",
"title": "Key",
"description": "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
"airbyte_secret": true
}
}
}
]
}
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.41 | 2022-06-21 | [\#13675(https://github.com/airbytehq/airbyte/pull/13675) | Add an option to use encryption with staging in Redshift Destination |
| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager |
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
Expand Down

0 comments on commit aa28d44

Please sign in to comment.