Skip to content

Commit

Permalink
Removing fusion from MonoSubscriber (#3245)
Browse files Browse the repository at this point in the history
Since Mono deals with at most one item, a notion of a queue is not justified. MonoSubscriber supported the notion of async fusion, which requires unnecessary operations to support it. This contributes to a performance degradation instead of improving it. Async fusion has been removed in 3.5.0 line as part of a rework of Mono stack to make them more lazy (compute result after request has been issued, instead of at subscription time). The asynchronous fusion aspect can be also removed as part of the 3.4.x release line to improve performance of existing usages, as it does not change the behaviour nor the APIs.

Fixes #3239
  • Loading branch information
UgiR authored Oct 27, 2022
1 parent 649904b commit b70f94e
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 340 deletions.
37 changes: 2 additions & 35 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) {

@Override
public final void clear() {
STATE.lazySet(this, FUSED_ASYNC_CONSUMED);
this.value = null;
}

Expand All @@ -1790,18 +1789,6 @@ public final void clear() {
public final void complete(@Nullable O v) {
for (; ; ) {
int state = this.state;
if (state == FUSED_ASYNC_EMPTY) {
setValue(v);
//sync memory since setValue is non volatile
if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) {
Subscriber<? super O> a = actual;
a.onNext(null);
a.onComplete();
return;
}
//refresh state if race occurred so we test if cancelled in the next comparison
state = this.state;
}

// if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
Expand Down Expand Up @@ -1850,7 +1837,7 @@ public final boolean isCancelled() {

@Override
public final boolean isEmpty() {
return this.state != FUSED_ASYNC_READY;
return true;
}

@Override
Expand All @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) {
@Override
@Nullable
public final O poll() {
if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) {
O v = value;
value = null;
return v;
}
return null;
}

Expand Down Expand Up @@ -1917,10 +1899,6 @@ public void request(long n) {

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
STATE.lazySet(this, FUSED_ASYNC_EMPTY);
return ASYNC;
}
return NONE;
}

Expand Down Expand Up @@ -1964,18 +1942,7 @@ public int size() {
* Indicates the Subscription has been cancelled.
*/
static final int CANCELLED = 4;
/**
* Indicates this Subscription is in ASYNC fusion mode and is currently empty.
*/
static final int FUSED_ASYNC_EMPTY = 8;
/**
* Indicates this Subscription is in ASYNC fusion mode and has a value.
*/
static final int FUSED_ASYNC_READY = 16;
/**
* Indicates this Subscription is in ASYNC fusion mode and its value has been consumed.
*/
static final int FUSED_ASYNC_CONSUMED = 32;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ public class FluxDoOnEachTest {
void doOnEachAsyncFusionDoesntTriggerOnNextTwice() {
List<String> signals = new ArrayList<>();
StepVerifier.create(Flux.just("a", "b", "c")
.collectList()
.limitRate(3)
.doOnEach(sig -> signals.add(sig.toString()))
)
.expectFusion(Fuseable.ASYNC)
.expectNext(Arrays.asList("a", "b", "c"))
.expectNext("a", "b", "c")
.verifyComplete();

assertThat(signals).containsExactly(
"doOnEach_onNext([a, b, c])",
"doOnEach_onNext(a)",
"doOnEach_onNext(b)",
"doOnEach_onNext(c)",
"onComplete()"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,6 @@ public void discardElementAndBufferOnAccumulatorLateFailure() {
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void discardElementAndBufferOnAccumulatorLateFailure_fused() {
Flux.just(1, 2, 3, 4)
.collect(ArrayList::new, (l, t) -> {
if (t == 3) {
throw new IllegalStateException("accumulator: boom");
}
l.add(t);
})
.as(StepVerifier::create)
//WARNING: we need to request fusion so this expectFusion is important
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("accumulator: boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void discardListElementsOnError() {
Mono<List<Integer>> test =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,53 +90,6 @@ public void onSuccessNormalConditional() {

}

@Test
public void onSuccessFusion() {
LongAdder invoked = new LongAdder();
AtomicBoolean hasNull = new AtomicBoolean();

Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doOnSuccess(v -> {
if (v == null) hasNull.set(true);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion(Fuseable.ASYNC)
.expectNext(55)
.expectComplete()
.verify();

assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
}

@Test
public void onSuccessFusionConditional() {
LongAdder invoked = new LongAdder();
AtomicBoolean hasNull = new AtomicBoolean();

Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.filter(v -> true)
.doOnSuccess(v -> {
if (v == null) hasNull.set(true);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();

assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
}

@Test
public void onSuccessOrErrorNormal() {
LongAdder invoked = new LongAdder();
Expand Down Expand Up @@ -194,61 +147,6 @@ public void onSuccessOrErrorNormalConditional() {
assertThat(error).as("unexpected error").hasValue(null);
}

@Test
public void onSuccessOrErrorFusion() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();

@SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();

assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
assertThat(error).as("unexpected error").hasValue(null);
}

@Test
public void onSuccessOrErrorFusionConditional() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();

@SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.filter(v -> true)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();

assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
assertThat(error).as("unexpected error").hasValue(null);
}

@Test
public void onAfterSuccessOrErrorNormal() {
LongAdder invoked = new LongAdder();
Expand Down Expand Up @@ -749,30 +647,6 @@ void testCallbacksFusionSync() {
assertThat(errorInvocation).hasValue(null);
}

@Test
void testCallbacksFusionAsync() {
AtomicReference<Integer> successInvocation = new AtomicReference<>();
AtomicReference<Throwable> errorInvocation = new AtomicReference<>();

Mono<Integer> source = Flux
.range(1, 10)
.reduce((a, b) -> a + b);

Mono<Integer> mono = new MonoPeekTerminal<>(source,
successInvocation::set,
errorInvocation::set,
null); //afterTerminate forces the negotiation of fusion mode NONE

StepVerifier.create(mono)
.expectFusion(Fuseable.ASYNC)
.expectNext(55)
.expectComplete()
.verify();

assertThat((Object) successInvocation.get()).isEqualTo(55);
assertThat(errorInvocation).hasValue(null);
}

@Test
public void should_reduce_to_10_events() {
for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,12 +41,12 @@ void normal() {
.publish(o -> o.flatMap(s -> Mono.just(2)));

StepVerifier.create(m)
.expectFusion()
.expectNoFusionSupport()
.expectNext(2)
.verifyComplete();

StepVerifier.create(m)
.expectFusion()
.expectNoFusionSupport()
.expectNext(2)
.verifyComplete();
}
Expand Down Expand Up @@ -139,24 +139,6 @@ void normalCancelBeforeComplete() {
assertThat(Mono.just(Mono.just(1).hide().publish(v -> v)).flatMapMany(v -> v).blockLast()).isEqualTo(1);
}

//see https://github.com/reactor/reactor-core/issues/2600
@Test
void errorFused() {
final String errorMessage = "Error in Mono";
final Mono<Object> source = Mono.error(new RuntimeException(errorMessage));
final Mono<Object> published = source.publish(coordinator -> coordinator.flatMap(Mono::just));

StepVerifier.create(published)
.expectFusion()
.expectErrorMessage(errorMessage)
.verify();

StepVerifier.create(published, StepVerifierOptions.create().scenarioName("second shared invocation"))
.expectFusion()
.expectErrorMessage(errorMessage)
.verify();
}

//see https://github.com/reactor/reactor-core/issues/2600
@Test
void errorHide() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() {
assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void queueSubscriptionAsyncRejected() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());

assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void clear() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());
Expand All @@ -50,7 +57,6 @@ public void clear() {

ds.clear();

assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED);
assertThat(ds.value).isNull();
}

Expand Down
Loading

0 comments on commit b70f94e

Please sign in to comment.