diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniMemoizeOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniMemoizeOp.java index c89ae318c..f013818e3 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniMemoizeOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniMemoizeOp.java @@ -54,17 +54,19 @@ public void subscribe(UniSubscriber subscriber) { case INIT: state = State.WAITING_FOR_UPSTREAM; awaiters.add(subscriber); + subscriber.onSubscribe(new MemoizedSubscription(subscriber)); currentContext = subscriber.context(); upstream().subscribe().withSubscriber(this); break; case WAITING_FOR_UPSTREAM: awaiters.add(subscriber); + subscriber.onSubscribe(new MemoizedSubscription(subscriber)); break; case CACHING: + subscriber.onSubscribe(new MemoizedSubscription(subscriber)); forwardTo(subscriber); break; } - subscriber.onSubscribe(new MemoizedSubscription(subscriber)); } finally { internalLock.unlock(); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java index e3bd16378..bcd9ec75d 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java @@ -534,4 +534,19 @@ public void reproducer_1303() throws ExecutionException, InterruptedException { } } + @Test + public void checkProtocolCorrectness() { + var log = new ArrayList(); + var uni = Uni.createFrom().item(() -> 58).memoize().indefinitely() + .onSubscription().invoke(() -> log.add("sub")) + .onItem().invoke(n -> log.add(String.valueOf(n))); + + Integer res = uni.await().atMost(Duration.ofSeconds(5)); + assertThat(res).isEqualTo(58); + assertThat(log).containsExactly("sub", "58"); + + uni.await().atMost(Duration.ofSeconds(5)); + assertThat(res).isEqualTo(58); + assertThat(log).containsExactly("sub", "58", "sub", "58"); + } }