Skip to content

Commit

Permalink
Merge branch '6.1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 5, 2024
2 parents 6f7f032 + 988f363 commit 19b5f11
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.observability.DefaultSignalListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -90,6 +90,7 @@
* @author Sam Brannen
* @author Stephane Nicoll
* @author Sebastien Deleuze
* @author Simon Baslé
* @since 3.1
*/
public abstract class CacheAspectSupport extends AbstractCacheInvoker
Expand Down Expand Up @@ -1036,32 +1037,45 @@ public void performCachePut(@Nullable Object value) {


/**
* Reactive Streams Subscriber collection for collecting a List to cache.
* Reactor stateful SignalListener for collecting a List to cache.
*/
private class CachePutListSubscriber implements Subscriber<Object> {
private class CachePutSignalListener extends DefaultSignalListener<Object> {

private final CachePutRequest request;
private final AtomicReference<CachePutRequest> request;

private final List<Object> cacheValue = new ArrayList<>();

public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
public CachePutSignalListener(CachePutRequest request) {
this.request = new AtomicReference<>(request);
}

@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
public void doOnNext(Object o) {
this.cacheValue.add(o);
}

@Override
public void onNext(Object o) {
this.cacheValue.add(o);
public void doOnComplete() {
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
}

@Override
public void onError(Throwable t) {
public void doOnCancel() {
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
}

@Override
public void onComplete() {
this.request.performCachePut(this.cacheValue);
public void doOnError(Throwable error) {
if (this.request.getAndSet(null) != null) {
this.cacheValue.clear();
}
}
}

Expand Down Expand Up @@ -1145,9 +1159,8 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
Flux<?> source = Flux.from(adapter.toPublisher(result));
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
.tap(() -> new CachePutSignalListener(request)));
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void cacheHitDetermination(Class<?> configClass) {
Long r3 = service.cacheFuture(key).join();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheFuture").isSameAs(r2).isSameAs(r3);

key = new Object();

Expand All @@ -70,7 +70,7 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheMono(key).block();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheMono").isSameAs(r2).isSameAs(r3);

key = new Object();

Expand All @@ -79,7 +79,7 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheFlux(key).blockFirst();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheFlux blockFirst").isSameAs(r2).isSameAs(r3);

key = new Object();

Expand All @@ -88,7 +88,7 @@ void cacheHitDetermination(Class<?> configClass) {
List<Long> l3 = service.cacheFlux(key).collectList().block();

assertThat(l1).isNotNull();
assertThat(l1).isEqualTo(l2).isEqualTo(l3);
assertThat(l1).as("cacheFlux collectList").isEqualTo(l2).isEqualTo(l3);

key = new Object();

Expand All @@ -97,7 +97,7 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheMono(key).block();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheMono common key").isSameAs(r2).isSameAs(r3);

// Same key as for Mono, reusing its cached value

Expand All @@ -106,12 +106,11 @@ void cacheHitDetermination(Class<?> configClass) {
r3 = service.cacheFlux(key).blockFirst();

assertThat(r1).isNotNull();
assertThat(r1).isSameAs(r2).isSameAs(r3);
assertThat(r1).as("cacheFlux blockFirst common key").isSameAs(r2).isSameAs(r3);

ctx.close();
}


@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand All @@ -124,12 +123,16 @@ CompletableFuture<Long> cacheFuture(Object arg) {

@Cacheable
Mono<Long> cacheMono(Object arg) {
return Mono.just(this.counter.getAndIncrement());
// here counter not only reflects invocations of cacheMono but subscriptions to
// the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Mono.defer(() -> Mono.just(this.counter.getAndIncrement()));
}

@Cacheable
Flux<Long> cacheFlux(Object arg) {
return Flux.just(this.counter.getAndIncrement(), 0L);
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L));
}
}

Expand Down

0 comments on commit 19b5f11

Please sign in to comment.