Skip to content

Commit

Permalink
Avoid duplicate upstream subscription during reactive cache put
Browse files Browse the repository at this point in the history
This commit fixes an issue where a Cacheable method which returns a
Flux (or multi-value publisher) will be invoked once, but the returned
publisher is actually subscribed twice.

By using the Reactor `tap` operator, we ensure that we can emit values
downstream AND accumulate emitted values into the List with a single
subscription.

The SignalListener additionally handles scenarios involving cancel,
for instance in case of a `take(1)` in the chain. In that case values
emitted up until that point will have been stored into the List buffer,
so we can still put it in the cache. In case of error, no caching occurs
and the internal buffer is cleared. This implementation also protects
against competing onComplete/onError signals and cancel signals.

Closes spring-projectsgh-32370
  • Loading branch information
simonbasle committed Mar 5, 2024
1 parent a0ae849 commit 988f363
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.observability.DefaultSignalListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -90,6 +90,7 @@
* @author Sam Brannen
* @author Stephane Nicoll
* @author Sebastien Deleuze
* @author Simon Baslé
* @since 3.1
*/
public abstract class CacheAspectSupport extends AbstractCacheInvoker
Expand Down Expand Up @@ -1036,32 +1037,45 @@ public void performCachePut(@Nullable Object value) {


/**
* Reactive Streams Subscriber collection for collecting a List to cache.
* Reactor stateful SignalListener for collecting a List to cache.
*/
private class CachePutListSubscriber implements Subscriber<Object> {
private class CachePutSignalListener extends DefaultSignalListener<Object> {

private final CachePutRequest request;
private final AtomicReference<CachePutRequest> request;

private final List<Object> cacheValue = new ArrayList<>();

public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
public CachePutSignalListener(CachePutRequest request) {
this.request = new AtomicReference<>(request);
}

@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
public void doOnNext(Object o) {
this.cacheValue.add(o);
}

@Override
public void onNext(Object o) {
this.cacheValue.add(o);
public void doOnComplete() {
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
}

@Override
public void onError(Throwable t) {
public void doOnCancel() {
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
}

@Override
public void onComplete() {
this.request.performCachePut(this.cacheValue);
public void doOnError(Throwable error) {
if (this.request.getAndSet(null) != null) {
this.cacheValue.clear();
}
}
}

Expand Down Expand Up @@ -1145,9 +1159,8 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
Flux<?> source = Flux.from(adapter.toPublisher(result));
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
.tap(() -> new CachePutSignalListener(request)));
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void cacheHitDetermination(Class<?> configClass) {
Long r3 = service.cacheFuture(key).join();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheFuture").isSameAs(r2).isSameAs(r3);

key = new Object();

Expand All @@ -70,7 +70,7 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheMono(key).block();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheMono").isSameAs(r2).isSameAs(r3);

key = new Object();

Expand All @@ -79,7 +79,7 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheFlux(key).blockFirst();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheFlux blockFirst").isSameAs(r2).isSameAs(r3);

key = new Object();

Expand All @@ -88,7 +88,7 @@ void cacheHitDetermination(Class<?> configClass) {
List<Long> l3 = service.cacheFlux(key).collectList().block();

assertThat(l1).isNotNull();
assertThat(l1).isEqualTo(l2).isEqualTo(l3);
assertThat(l1).as("cacheFlux collectList").isEqualTo(l2).isEqualTo(l3);

key = new Object();

Expand All @@ -97,7 +97,7 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheMono(key).block();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheMono common key").isSameAs(r2).isSameAs(r3);

// Same key as for Mono, reusing its cached value

Expand All @@ -106,12 +106,11 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheFlux(key).blockFirst();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheFlux blockFirst common key").isSameAs(r2).isSameAs(r3);

ctx.close();
}


@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand All @@ -124,12 +123,16 @@ CompletableFuture<Long> cacheFuture(Object arg) {

@Cacheable
Mono<Long> cacheMono(Object arg) {
return Mono.just(this.counter.getAndIncrement());
// here counter not only reflects invocations of cacheMono but subscriptions to
// the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Mono.defer(() -> Mono.just(this.counter.getAndIncrement()));
}

@Cacheable
Flux<Long> cacheFlux(Object arg) {
return Flux.just(this.counter.getAndIncrement(), 0L);
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L));
}
}

Expand Down

0 comments on commit 988f363

Please sign in to comment.