Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dataplane): deserialize secret depending on content #480

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public void initialize(ServiceExtensionContext context) {
}
var monitor = context.getMonitor();

var sourceFactory = new S3DataSourceFactory(awsClientProvider, monitor, vault, typeManager, validator);
var sourceFactory = new S3DataSourceFactory(awsClientProvider, monitor, vault, typeManager.getMapper(), validator);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new S3DataSinkFactory(awsClientProvider, executorService, monitor, vault, typeManager, chunkSizeInBytes, validator);
var sinkFactory = new S3DataSinkFactory(awsClientProvider, executorService, monitor, vault, typeManager.getMapper(), chunkSizeInBytes, validator);
pipelineService.registerFactory(sinkFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,26 @@
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* ZF Friedrichshafen AG
* Cofinity-X - fix secret deserialization
*
*/

package org.eclipse.edc.connector.dataplane.aws.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.aws.s3.AwsClientProvider;
import org.eclipse.edc.aws.s3.AwsSecretToken;
import org.eclipse.edc.aws.s3.AwsTemporarySecretToken;
import org.eclipse.edc.aws.s3.S3ClientRequest;
import org.eclipse.edc.aws.s3.spi.S3BucketSchema;
import org.eclipse.edc.aws.s3.validation.S3DataAddressCredentialsValidator;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.SecretToken;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.util.string.StringUtils;
Expand All @@ -36,6 +38,7 @@
import org.eclipse.edc.validator.spi.Validator;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static java.util.Optional.ofNullable;
Expand All @@ -54,17 +57,17 @@ public class S3DataSinkFactory implements DataSinkFactory {
private final ExecutorService executorService;
private final Monitor monitor;
private final Vault vault;
private final TypeManager typeManager;
private final ObjectMapper objectMapper;
private final int chunkSizeInBytes;
private final DataAddressValidatorRegistry dataAddressValidator;

public S3DataSinkFactory(AwsClientProvider clientProvider, ExecutorService executorService, Monitor monitor, Vault vault,
TypeManager typeManager, int chunkSizeInBytes, DataAddressValidatorRegistry dataAddressValidator) {
ObjectMapper objectMapper, int chunkSizeInBytes, DataAddressValidatorRegistry dataAddressValidator) {
this.clientProvider = clientProvider;
this.executorService = executorService;
this.monitor = monitor;
this.vault = vault;
this.typeManager = typeManager;
this.objectMapper = objectMapper;
this.chunkSizeInBytes = chunkSizeInBytes;
this.dataAddressValidator = dataAddressValidator;
}
Expand Down Expand Up @@ -116,7 +119,7 @@ private S3ClientRequest createS3ClientRequest(DataAddress address) {
.filter(keyName -> !StringUtils.isNullOrBlank(keyName))
.map(vault::resolveSecret)
.filter(secret -> !StringUtils.isNullOrBlank(secret))
.map(s -> typeManager.readValue(s, AwsTemporarySecretToken.class));
.map(this::deserializeSecretToken);

if (awsSecretToken.isPresent()) {
return S3ClientRequest.from(region, endpointOverride, awsSecretToken.get());
Expand All @@ -128,4 +131,17 @@ private S3ClientRequest createS3ClientRequest(DataAddress address) {
return S3ClientRequest.from(region, endpointOverride);
}
}

private SecretToken deserializeSecretToken(String secret) {
try {
var tree = objectMapper.readTree(secret);
if (tree.has("sessionToken")) {
return objectMapper.treeToValue(tree, AwsTemporarySecretToken.class);
} else {
return objectMapper.treeToValue(tree, AwsSecretToken.class);
}
} catch (IOException e) {
throw new EdcException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,26 @@
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* ZF Friedrichshafen AG - Initial implementation
* Cofinity-X - fix secret deserialization
*
*/

package org.eclipse.edc.connector.dataplane.aws.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.aws.s3.AwsClientProvider;
import org.eclipse.edc.aws.s3.AwsSecretToken;
import org.eclipse.edc.aws.s3.AwsTemporarySecretToken;
import org.eclipse.edc.aws.s3.S3ClientRequest;
import org.eclipse.edc.aws.s3.spi.S3BucketSchema;
import org.eclipse.edc.aws.s3.validation.S3DataAddressCredentialsValidator;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.SecretToken;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.util.string.StringUtils;
Expand All @@ -36,6 +38,8 @@
import org.eclipse.edc.validator.spi.Validator;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

import static java.util.Optional.ofNullable;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.ACCESS_KEY_ID;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.BUCKET_NAME;
Expand All @@ -51,14 +55,14 @@ public class S3DataSourceFactory implements DataSourceFactory {
private final AwsClientProvider clientProvider;
private final Monitor monitor;
private final Vault vault;
private final TypeManager typeManager;
private final ObjectMapper objectMapper;
private final DataAddressValidatorRegistry validator;

public S3DataSourceFactory(AwsClientProvider clientProvider, Monitor monitor, Vault vault, TypeManager typeManager, DataAddressValidatorRegistry validator) {
public S3DataSourceFactory(AwsClientProvider clientProvider, Monitor monitor, Vault vault, ObjectMapper objectMapper, DataAddressValidatorRegistry validator) {
this.clientProvider = clientProvider;
this.monitor = monitor;
this.vault = vault;
this.typeManager = typeManager;
this.objectMapper = objectMapper;
this.validator = validator;
}

Expand Down Expand Up @@ -105,7 +109,7 @@ private S3ClientRequest createS3ClientRequest(DataAddress address) {
.filter(keyName -> !StringUtils.isNullOrBlank(keyName))
.map(vault::resolveSecret)
.filter(secret -> !StringUtils.isNullOrBlank(secret))
.map(s -> typeManager.readValue(s, AwsTemporarySecretToken.class));
.map(this::deserializeSecretToken);

if (awsSecretToken.isPresent()) {
return S3ClientRequest.from(region, endpointOverride, awsSecretToken.get());
Expand All @@ -118,5 +122,17 @@ private S3ClientRequest createS3ClientRequest(DataAddress address) {
return S3ClientRequest.from(region, endpointOverride);
}
}


private SecretToken deserializeSecretToken(String secret) {
try {
var tree = objectMapper.readTree(secret);
if (tree.has("sessionToken")) {
return objectMapper.treeToValue(tree, AwsTemporarySecretToken.class);
} else {
return objectMapper.treeToValue(tree, AwsSecretToken.class);
}
} catch (IOException e) {
throw new EdcException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ void setup() {

var typeManager = new JacksonTypeManager();
var chunkSizeInBytes = 1024 * 1024 * 20;
sourceFactory = new S3DataSourceFactory(sourceClient.getClientProvider(), mock(), mock(), typeManager, validator);
sinkFactory = new S3DataSinkFactory(destinationClient.getClientProvider(), Executors.newSingleThreadExecutor(), mock(), mock(), typeManager, chunkSizeInBytes, validator);
sourceFactory = new S3DataSourceFactory(sourceClient.getClientProvider(), mock(), mock(), typeManager.getMapper(), validator);
sinkFactory = new S3DataSinkFactory(destinationClient.getClientProvider(), Executors.newSingleThreadExecutor(), mock(), mock(), typeManager.getMapper(), chunkSizeInBytes, validator);

sourceClient.createBucket(sourceBucketName);
destinationClient.createBucket(destinationBucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* ZF Friedrichshafen AG
* Cofinity-X - additional test for secret deserialization
*
*/

Expand Down Expand Up @@ -49,8 +50,8 @@ class S3DataSinkFactoryTest {
private final TypeManager typeManager = new JacksonTypeManager();
private final DataAddressValidatorRegistry validator = mock();

private final S3DataSinkFactory factory = new S3DataSinkFactory(clientProvider, mock(), mock(), vault, typeManager,
1024, validator);
private final S3DataSinkFactory factory = new S3DataSinkFactory(clientProvider, mock(), mock(),
vault, typeManager.getMapper(), 1024, validator);

@Test
void canHandle_returnsTrueWhenExpectedType() {
Expand Down Expand Up @@ -93,6 +94,25 @@ void validate_shouldFail_whenValidatorFails() {
assertThat(result).isFailed();
verify(validator).validateDestination(destination);
}

@Test
void createSink_shouldGetTheSecretTokenFromTheVault() {
var destination = TestFunctions.s3DataAddressWithCredentials();
var secretToken = new AwsSecretToken("accessKeyId", "secretAccessKey");
when(vault.resolveSecret(destination.getKeyName())).thenReturn(typeManager.writeValueAsString(secretToken));
when(validator.validateDestination(any())).thenReturn(ValidationResult.success());
var request = createRequest(destination);

var sink = factory.createSink(request);

assertThat(sink).isNotNull().isInstanceOf(S3DataSink.class);
var captor = ArgumentCaptor.forClass(S3ClientRequest.class);
verify(clientProvider).s3Client(captor.capture());
var s3ClientRequest = captor.getValue();
assertThat(s3ClientRequest.region()).isEqualTo(TestFunctions.VALID_REGION);
assertThat(s3ClientRequest.secretToken()).isInstanceOf(AwsSecretToken.class);
assertThat(s3ClientRequest.endpointOverride()).isNull();
}

@Test
void createSink_shouldGetTheTemporarySecretTokenFromTheVault() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* ZF Friedrichshafen AG - Initial implementation
* Cofinity-X - additional test for secret deserialization
*
*/

package org.eclipse.edc.connector.dataplane.aws.s3;

import org.eclipse.edc.aws.s3.AwsClientProvider;
import org.eclipse.edc.aws.s3.AwsSecretToken;
import org.eclipse.edc.aws.s3.AwsTemporarySecretToken;
import org.eclipse.edc.aws.s3.S3ClientRequest;
import org.eclipse.edc.aws.s3.spi.S3BucketSchema;
Expand Down Expand Up @@ -49,7 +51,8 @@ class S3DataSourceFactoryTest {
private final Vault vault = mock();
private final DataAddressValidatorRegistry validator = mock();

private final S3DataSourceFactory factory = new S3DataSourceFactory(clientProvider, mock(), vault, typeManager, validator);
private final S3DataSourceFactory factory = new S3DataSourceFactory(clientProvider, mock(),
vault, typeManager.getMapper(), validator);

@Test
void canHandle_returnsTrueWhenExpectedType() {
Expand Down Expand Up @@ -120,9 +123,28 @@ void createSink_shouldLetTheProviderGetTheCredentialsIfNotProvidedByTheAddress()
assertThat(s3ClientRequest.secretToken()).isNull();
assertThat(s3ClientRequest.endpointOverride()).isNull();
}

@Test
void createSource_shouldGetTheSecretTokenFromTheVault() {
when(validator.validateSource(any())).thenReturn(ValidationResult.success());
var source = TestFunctions.s3DataAddressWithCredentials();
var secretToken = new AwsSecretToken("accessKeyId", "secretAccessKey");
when(vault.resolveSecret(source.getKeyName())).thenReturn(typeManager.writeValueAsString(secretToken));
var request = createRequest(source);

var s3Source = factory.createSource(request);

assertThat(s3Source).isNotNull().isInstanceOf(S3DataSource.class);
var captor = ArgumentCaptor.forClass(S3ClientRequest.class);
verify(clientProvider).s3Client(captor.capture());
var s3ClientRequest = captor.getValue();
assertThat(s3ClientRequest.region()).isEqualTo(TestFunctions.VALID_REGION);
assertThat(s3ClientRequest.secretToken()).isInstanceOf(AwsSecretToken.class);
assertThat(s3ClientRequest.endpointOverride()).isNull();
}

@Test
void createSource_shouldGetTheTemporarySecretTokenFromTheVault() {
when(validator.validateSource(any())).thenReturn(ValidationResult.success());
var source = TestFunctions.s3DataAddressWithCredentials();
var temporaryKey = new AwsTemporarySecretToken("temporaryId", "temporarySecret", null, 0);
Expand Down
Loading