Skip to content

Commit

Permalink
Declarative way for setting MongoDB transaction options.
Browse files Browse the repository at this point in the history
  • Loading branch information
ykardziyaka committed Nov 11, 2023
1 parent 5e76c04 commit 3fe9fbf
Show file tree
Hide file tree
Showing 8 changed files with 786 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected void doBegin(Object transaction, TransactionDefinition definition) thr
}

try {
mongoTransactionObject.startTransaction(options);
mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options));
} catch (MongoException ex) {
throw new TransactionSystemException(String.format("Could not start Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.Nullable;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.TransactionAttribute;

import com.mongodb.ReadConcern;
import com.mongodb.ReadConcernLevel;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;

/**
* Helper class for translating @Transactional labels into Mongo-specific {@link TransactionOptions}.
*
* @author Yan Kardziyaka
*/
public final class MongoTransactionUtils {
private static final Log LOGGER = LogFactory.getLog(MongoTransactionUtils.class);

private static final String MAX_COMMIT_TIME = "mongo:maxCommitTime";

private static final String READ_CONCERN_OPTION = "mongo:readConcern";

private static final String READ_PREFERENCE_OPTION = "mongo:readPreference";

private static final String WRITE_CONCERN_OPTION = "mongo:writeConcern";

private MongoTransactionUtils() {}

@Nullable
public static TransactionOptions extractOptions(TransactionDefinition transactionDefinition,
@Nullable TransactionOptions fallbackOptions) {
if (transactionDefinition instanceof TransactionAttribute transactionAttribute) {
TransactionOptions.Builder builder = null;
for (String label : transactionAttribute.getLabels()) {
String[] tokens = label.split("=", 2);
builder = tokens.length == 2 ? enhanceWithProperty(builder, tokens[0], tokens[1]) : builder;
}
if (builder == null) {
return fallbackOptions;
}
TransactionOptions options = builder.build();
return fallbackOptions == null ? options : TransactionOptions.merge(options, fallbackOptions);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("%s cannot be casted to %s. Transaction labels won't be evaluated as options".formatted(
TransactionDefinition.class.getName(), TransactionAttribute.class.getName()));
}
return fallbackOptions;
}
}

@Nullable
private static TransactionOptions.Builder enhanceWithProperty(@Nullable TransactionOptions.Builder builder,
String key, String value) {
return switch (key) {
case MAX_COMMIT_TIME -> nullSafe(builder).maxCommitTime(Duration.parse(value).toMillis(), TimeUnit.MILLISECONDS);
case READ_CONCERN_OPTION -> nullSafe(builder).readConcern(new ReadConcern(ReadConcernLevel.fromString(value)));
case READ_PREFERENCE_OPTION -> nullSafe(builder).readPreference(ReadPreference.valueOf(value));
case WRITE_CONCERN_OPTION -> nullSafe(builder).writeConcern(getWriteConcern(value));
default -> builder;
};
}

private static TransactionOptions.Builder nullSafe(@Nullable TransactionOptions.Builder builder) {
return builder == null ? TransactionOptions.builder() : builder;
}

private static WriteConcern getWriteConcern(String writeConcernAsString) {
WriteConcern writeConcern = WriteConcern.valueOf(writeConcernAsString);
if (writeConcern == null) {
throw new IllegalArgumentException("'%s' is not a valid WriteConcern".formatted(writeConcernAsString));
}
return writeConcern;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa

}).doOnNext(resourceHolder -> {

mongoTransactionObject.startTransaction(options);
mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options));

if (logger.isDebugEnabled()) {
logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getSession())));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb;

import static org.assertj.core.api.Assertions.*;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;

/**
* @author Yan Kardziyaka
*/
class MongoTransactionUtilsUnitTests {

@Test // GH-1628
void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidMaxCommitTime() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:maxCommitTime=-PT5S"));

assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions))
.isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-1628
void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readConcern=invalidValue"));

assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions))
.isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-1628
void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadPreference() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readPreference=invalidValue"));

assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions))
.isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-1628
void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidWriteConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:writeConcern=invalidValue"));

assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions))
.isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-1628
void shouldReturnFallbackOptionsIfNotTransactionAttribute() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();

TransactionOptions result = MongoTransactionUtils.extractOptions(definition, fallbackOptions);

assertThat(result).isSameAs(fallbackOptions);
}

@Test // GH-1628
void shouldReturnFallbackOptionsIfNoLabelsProvided() {
TransactionOptions fallbackOptions = getTransactionOptions();
TransactionAttribute attribute = new DefaultTransactionAttribute();

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isSameAs(fallbackOptions);
}

@Test // GH-1628
void shouldReturnFallbackOptionsIfLabelsDoesNotContainValidOptions() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:readConcern", "writeConcern", "readPreference=SECONDARY",
"mongo:maxCommitTime PT5M", UUID.randomUUID().toString());
attribute.setLabels(labels);

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isSameAs(fallbackOptions);
}

@Test // GH-1628
void shouldReturnMergedOptionsIfLabelsContainMaxCommitTime() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:maxCommitTime=PT5S"));

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isNotSameAs(fallbackOptions)
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS)))
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern))
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference))
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
}

@Test // GH-1628
void shouldReturnMergedOptionsIfLabelsContainReadConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readConcern=majority"));

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isNotSameAs(fallbackOptions)
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES)))
.returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern))
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference))
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
}

@Test // GH-1628
void shouldReturnMergedOptionsIfLabelsContainReadPreference() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readPreference=primaryPreferred"));

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isNotSameAs(fallbackOptions)
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES)))
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern))
.returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference))
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
}

@Test // GH-1628
void shouldReturnMergedOptionsIfLabelsContainWriteConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:writeConcern=w3"));

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isNotSameAs(fallbackOptions)
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES)))
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern))
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference))
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}

@Test // GH-1628
void shouldReturnNewOptionsIfLabelsContainAllOptions() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:readConcern=majority",
"mongo:readPreference=primaryPreferred", "mongo:writeConcern=w3");
attribute.setLabels(labels);

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isNotSameAs(fallbackOptions)
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS)))
.returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern))
.returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference))
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}

@Test // GH-1628
void shouldReturnMergedOptionsIfLabelsContainOptionsMixedWithOrdinaryStrings() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:nonExistentOption=value", "label",
"mongo:writeConcern=w3");
attribute.setLabels(labels);

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);

assertThat(result).isNotSameAs(fallbackOptions)
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS)))
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern))
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference))
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}

@Test // GH-1628
void shouldReturnNewOptionsIFallbackIsNull() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:writeConcern=w3");
attribute.setLabels(labels);

TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, null);

assertThat(result).returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS)))
.returns(null, from(TransactionOptions::getReadConcern))
.returns(null, from(TransactionOptions::getReadPreference))
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}

private TransactionOptions getTransactionOptions() {
return TransactionOptions.builder()
.maxCommitTime(1L, TimeUnit.MINUTES)
.readConcern(ReadConcern.AVAILABLE)
.readPreference(ReadPreference.secondaryPreferred())
.writeConcern(WriteConcern.UNACKNOWLEDGED).build();
}
}
Loading

0 comments on commit 3fe9fbf

Please sign in to comment.