Skip to content

Commit

Permalink
Merge #226 into 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 16, 2020
2 parents a46528f + 52c9086 commit ab1d3cb
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 11 deletions.
12 changes: 6 additions & 6 deletions reactor-extra/src/main/java/reactor/cache/CacheFlux.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ else if (fromCache instanceof List) {
*/
public static <KEY, VALUE> FluxCacheBuilderCacheMiss<KEY, VALUE> lookup(
Function<KEY, Mono<List<Signal<VALUE>>>> reader, KEY key) {
return otherSupplier -> writer ->
return otherSupplier -> (BiFunction<KEY, List<Signal<VALUE>>, Mono<Void>> writer) ->
Flux.defer(() ->
reader.apply(key)
.switchIfEmpty(otherSupplier.get()
.materialize()
.collectList()
.flatMap(signals -> writer.apply(key, signals)
.then(Mono.just(signals))))
.switchIfEmpty(otherSupplier.get()
.materialize()
.collectList()
.flatMap(signals -> writer.apply(key, signals)
.then(Mono.just(signals))))
.flatMapIterable(Function.identity())
.dematerialize()
);
Expand Down
21 changes: 16 additions & 5 deletions reactor-extra/src/main/java/reactor/cache/CacheMono.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.function.Function;
import java.util.function.Supplier;

import javax.swing.SingleSelectionModel;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

Expand Down Expand Up @@ -88,11 +91,19 @@ public class CacheMono {
* @see #lookup(Map, Object, Class)
*/
public static <KEY, VALUE> MonoCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super Signal<? extends VALUE>> cacheMap, KEY key) {
return otherSupplier -> Mono.defer(() ->
Mono.justOrEmpty(cacheMap.get(key))
.switchIfEmpty(otherSupplier.get().materialize()
.doOnNext(value -> cacheMap.put(key, value)))
.dematerialize()
return otherSupplier -> Mono.defer(() -> {
Object fromCache = cacheMap.get(key);
if (fromCache == null) {
return otherSupplier.get()
.materialize()
.doOnNext(value -> cacheMap.put(key, value))
.dematerialize();
}
if (fromCache instanceof Signal) {
return Mono.just(fromCache).dematerialize();
}
throw new IllegalArgumentException("Content of cache for key " + key + " must be a Signal");
}
);
}

Expand Down
21 changes: 21 additions & 0 deletions reactor-extra/src/test/java/reactor/cache/CacheFluxTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -369,4 +370,24 @@ public void mapWithRelaxedTypes() {
assertThat(cacheMap).hasSize(1);
}


@Test
public void supplierNotEagerlyCalledIfDataInMapCache() {
AtomicBoolean supplierCalled = new AtomicBoolean();
Map<String, Object> genericMap = new HashMap<>();
genericMap.put("foo", Arrays.asList(Signal.next(123), Signal.next(456), Signal.complete()));

CacheFlux.lookup(genericMap, "foo", Integer.class)
.onCacheMissResume(() -> {
supplierCalled.set(true);
return Flux.just(100);
})
.as(StepVerifier::create)
.expectNext(123, 456)
.expectComplete()
.verify();

assertThat(supplierCalled).isFalse();
}

}
21 changes: 21 additions & 0 deletions reactor-extra/src/test/java/reactor/cache/CacheMonoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -400,6 +401,26 @@ public void genericObjectMap() {
assertThat(count).as("cache hit").hasValue(1);
}

//see https://github.com/reactor/reactor-addons/issues/226
@Test
public void supplierNotEagerlyCalledIfDataInMapCache() {
AtomicBoolean supplierCalled = new AtomicBoolean();
Map<String, Object> genericMap = new HashMap<>();
genericMap.put("foo", Signal.next(123));

CacheMono.lookup(genericMap, "foo", Integer.class)
.onCacheMissResume(() -> {
supplierCalled.set(true);
return Mono.just(100);
})
.as(StepVerifier::create)
.expectNext(123)
.expectComplete()
.verify();

assertThat(supplierCalled).isFalse();
}

private static <K, V> Function<K, Mono<Signal<? extends V>>> reader(Map<K, ? extends Signal<? extends V>> cache) {
return key -> Mono.justOrEmpty(cache.get(key));
}
Expand Down

0 comments on commit ab1d3cb

Please sign in to comment.