From 5b86b2fa8837630e3725dd5ddae4882935c35342 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 28 Nov 2024 15:56:22 +0100 Subject: [PATCH] Polishing. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch to Flux.fromIterable(…) from StreamUtils in deleteAll(Iterable). Use switch expressions, refine toList/toCollection arrangement. Guard tests against absent ReplicaSet. See #4838 Original pull request: #4843 --- .../SimpleReactiveMongoRepository.java | 73 ++++++++++++------- .../SimpleReactiveMongoRepositoryTests.java | 3 + 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index a134498f35..19171b7b13 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -28,9 +28,9 @@ import java.util.Optional; import java.util.function.Function; import java.util.function.UnaryOperator; -import java.util.stream.Collectors; import org.reactivestreams.Publisher; + import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.domain.Example; @@ -47,7 +47,6 @@ import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.data.mongodb.repository.query.MongoEntityInformation; import org.springframework.data.repository.query.FluentQuery; -import org.springframework.data.util.StreamUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -264,8 +263,15 @@ public Mono deleteAllById(Iterable ids) { Assert.notNull(ids, "The given Iterable of Id's must not be null"); + return deleteAllById(ids, getReadPreference()); + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private Mono deleteAllById(Iterable ids, Optional readPreference) { + Query query = getIdQuery(ids); - getReadPreference().ifPresent(query::withReadPreference); + readPreference.ifPresent(query::withReadPreference); + return mongoOperations.remove(query, entityInformation.getJavaType(), entityInformation.getCollectionName()).then(); } @@ -274,10 +280,9 @@ public Mono deleteAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Collection ids = StreamUtils.createStreamFromIterator(entities.iterator()) - .map(entityInformation::getId).collect(Collectors.toList()); - - return deleteAllById(ids); + Optional readPreference = getReadPreference(); + return Flux.fromIterable(entities).map(entityInformation::getRequiredId).collectList() + .flatMap(ids -> deleteAllById(ids, readPreference)); } @Override @@ -464,10 +469,10 @@ private Query getIdQuery(Iterable ids) { /** * Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single - * Flux. The operation does not allow interleave between performing the map operation for the first and second source - * element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be - * subscribed to eagerly emitting elements in order of their source. - * + * Flux. The operation does not allow interleaving between performing the map operation for the first and second + * source element guaranteeing the mapping operation completed before subscribing to its following inners, that will + * then be subscribed to eagerly emitting elements in order of their source. + * *
 	 * Flux.just(first-element).flatMap(...)
 	 *     .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
@@ -481,25 +486,23 @@ private Query getIdQuery(Iterable ids) {
 	static  Flux concatMapSequentially(List source,
 			Function> mapper) {
 
-		if (source.isEmpty()) {
-			return Flux.empty();
-		}
-		if (source.size() == 1) {
-			return Flux.just(source.iterator().next()).flatMap(mapper);
-		}
-		if (source.size() == 2) {
-			return Flux.fromIterable(source).concatMap(mapper);
-		}
+		return switch (source.size()) {
+			case 0 -> Flux.empty();
+			case 1 -> Flux.just(source.get(0)).flatMap(mapper);
+			case 2 -> Flux.fromIterable(source).concatMap(mapper);
+			default -> {
 
-		Flux first = Flux.just(source.get(0)).flatMap(mapper);
-		Flux theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
-		return first.concatWith(theRest);
+				Flux first = Flux.just(source.get(0)).flatMap(mapper);
+				Flux theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
+				yield first.concatWith(theRest);
+			}
+		};
 	}
 
 	static  Flux concatMapSequentially(Publisher publisher,
 			Function> mapper) {
 
-		return Flux.from(publisher).switchOnFirst(((signal, source) -> {
+		return Flux.from(publisher).switchOnFirst((signal, source) -> {
 
 			if (!signal.hasValue()) {
 				return source.concatMap(mapper);
@@ -507,16 +510,30 @@ static  Flux concatMapSequentially(Publisher publisher,
 
 			Mono firstCall = Mono.from(mapper.apply(signal.get()));
 			return firstCall.concatWith(source.skip(1).flatMapSequential(mapper));
-		}));
+		});
 	}
 
 	private static  List toList(Iterable source) {
-		return source instanceof List list ? list : new ArrayList<>(toCollection(source));
+
+		Collection collection = toCollection(source);
+
+		if (collection instanceof List list) {
+			return list;
+		}
+
+		return new ArrayList<>(collection);
 	}
 
 	private static  Collection toCollection(Iterable source) {
-		return source instanceof Collection collection ? collection
-				: StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList());
+
+		if (source instanceof Collection collection) {
+			return collection;
+		}
+
+		List list = new ArrayList<>();
+		source.forEach(list::add);
+
+		return list;
 	}
 
 	/**
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java
index c4a8c58e4b..32e14e38bf 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java
@@ -46,6 +46,7 @@
 import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
 import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory;
 import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository;
+import org.springframework.data.mongodb.test.util.EnableIfReplicaSetAvailable;
 import org.springframework.data.repository.query.FluentQuery;
 import org.springframework.data.repository.query.ReactiveQueryMethodEvaluationContextProvider;
 import org.springframework.lang.Nullable;
@@ -339,6 +340,7 @@ void savePublisherOfEntitiesShouldInsertEntity() {
 	}
 
 	@RepeatedTest(10) // GH-4838
+	@EnableIfReplicaSetAvailable
 	void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
 
 		ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
@@ -349,6 +351,7 @@ void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
 	}
 
 	@RepeatedTest(10) // GH-4838
+	@EnableIfReplicaSetAvailable
 	void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() {
 
 		ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());