Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Switch to Flux.fromIterable(…) from StreamUtils in deleteAll(Iterable). Use switch expressions, refine toList/toCollection arrangement. Guard tests against absent ReplicaSet.

See #4838
Original pull request: #4843
  • Loading branch information
mp911de committed Nov 28, 2024
1 parent 721799d commit 5b86b2f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import org.reactivestreams.Publisher;

import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.domain.Example;
Expand All @@ -47,7 +47,6 @@
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
import org.springframework.data.repository.query.FluentQuery;
import org.springframework.data.util.StreamUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -264,8 +263,15 @@ public Mono<Void> deleteAllById(Iterable<? extends ID> ids) {

Assert.notNull(ids, "The given Iterable of Id's must not be null");

return deleteAllById(ids, getReadPreference());
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private Mono<Void> deleteAllById(Iterable<? extends ID> ids, Optional<ReadPreference> readPreference) {

Query query = getIdQuery(ids);
getReadPreference().ifPresent(query::withReadPreference);
readPreference.ifPresent(query::withReadPreference);

return mongoOperations.remove(query, entityInformation.getJavaType(), entityInformation.getCollectionName()).then();
}

Expand All @@ -274,10 +280,9 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {

Assert.notNull(entities, "The given Iterable of entities must not be null");

Collection<? extends ID> ids = StreamUtils.createStreamFromIterator(entities.iterator())
.map(entityInformation::getId).collect(Collectors.toList());

return deleteAllById(ids);
Optional<ReadPreference> readPreference = getReadPreference();
return Flux.fromIterable(entities).map(entityInformation::getRequiredId).collectList()
.flatMap(ids -> deleteAllById(ids, readPreference));
}

@Override
Expand Down Expand Up @@ -464,10 +469,10 @@ private Query getIdQuery(Iterable<? extends ID> ids) {

/**
* Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single
* Flux. The operation does not allow interleave between performing the map operation for the first and second source
* element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be
* subscribed to eagerly emitting elements in order of their source.
*
* Flux. The operation does not allow interleaving between performing the map operation for the first and second
* source element guaranteeing the mapping operation completed before subscribing to its following inners, that will
* then be subscribed to eagerly emitting elements in order of their source.
*
* <pre class="code">
* Flux.just(first-element).flatMap(...)
* .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
Expand All @@ -481,42 +486,54 @@ private Query getIdQuery(Iterable<? extends ID> ids) {
static <T> Flux<T> concatMapSequentially(List<T> source,
Function<? super T, ? extends Publisher<? extends T>> mapper) {

if (source.isEmpty()) {
return Flux.empty();
}
if (source.size() == 1) {
return Flux.just(source.iterator().next()).flatMap(mapper);
}
if (source.size() == 2) {
return Flux.fromIterable(source).concatMap(mapper);
}
return switch (source.size()) {
case 0 -> Flux.empty();
case 1 -> Flux.just(source.get(0)).flatMap(mapper);
case 2 -> Flux.fromIterable(source).concatMap(mapper);
default -> {

Flux<T> first = Flux.just(source.get(0)).flatMap(mapper);
Flux<T> theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
return first.concatWith(theRest);
Flux<T> first = Flux.just(source.get(0)).flatMap(mapper);
Flux<T> theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
yield first.concatWith(theRest);
}
};
}

static <T> Flux<T> concatMapSequentially(Publisher<T> publisher,
Function<? super T, ? extends Publisher<? extends T>> mapper) {

return Flux.from(publisher).switchOnFirst(((signal, source) -> {
return Flux.from(publisher).switchOnFirst((signal, source) -> {

if (!signal.hasValue()) {
return source.concatMap(mapper);
}

Mono<T> firstCall = Mono.from(mapper.apply(signal.get()));
return firstCall.concatWith(source.skip(1).flatMapSequential(mapper));
}));
});
}

private static <E> List<E> toList(Iterable<E> source) {
return source instanceof List<E> list ? list : new ArrayList<>(toCollection(source));

Collection<E> collection = toCollection(source);

if (collection instanceof List<E> list) {
return list;
}

return new ArrayList<>(collection);
}

private static <E> Collection<E> toCollection(Iterable<E> source) {
return source instanceof Collection<E> collection ? collection
: StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList());

if (source instanceof Collection<E> collection) {
return collection;
}

List<E> list = new ArrayList<>();
source.forEach(list::add);

return list;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory;
import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository;
import org.springframework.data.mongodb.test.util.EnableIfReplicaSetAvailable;
import org.springframework.data.repository.query.FluentQuery;
import org.springframework.data.repository.query.ReactiveQueryMethodEvaluationContextProvider;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -339,6 +340,7 @@ void savePublisherOfEntitiesShouldInsertEntity() {
}

@RepeatedTest(10) // GH-4838
@EnableIfReplicaSetAvailable
void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {

ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
Expand All @@ -349,6 +351,7 @@ void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
}

@RepeatedTest(10) // GH-4838
@EnableIfReplicaSetAvailable
void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() {

ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
Expand Down

0 comments on commit 5b86b2f

Please sign in to comment.