Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable fusion in MonoSubscriber #3245

Merged
merged 5 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) {

@Override
public final void clear() {
STATE.lazySet(this, FUSED_ASYNC_CONSUMED);
this.value = null;
}

Expand All @@ -1790,18 +1789,6 @@ public final void clear() {
public final void complete(@Nullable O v) {
for (; ; ) {
int state = this.state;
if (state == FUSED_ASYNC_EMPTY) {
setValue(v);
//sync memory since setValue is non volatile
if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) {
Subscriber<? super O> a = actual;
a.onNext(null);
a.onComplete();
return;
}
//refresh state if race occurred so we test if cancelled in the next comparison
state = this.state;
}

// if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
Expand Down Expand Up @@ -1850,7 +1837,7 @@ public final boolean isCancelled() {

@Override
public final boolean isEmpty() {
return this.state != FUSED_ASYNC_READY;
return true;
}

@Override
Expand All @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) {
@Override
@Nullable
public final O poll() {
if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) {
O v = value;
value = null;
return v;
}
return null;
}

Expand Down Expand Up @@ -1917,10 +1899,6 @@ public void request(long n) {

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
STATE.lazySet(this, FUSED_ASYNC_EMPTY);
return ASYNC;
}
return NONE;
}

Expand Down Expand Up @@ -1964,18 +1942,7 @@ public int size() {
* Indicates the Subscription has been cancelled.
*/
static final int CANCELLED = 4;
/**
* Indicates this Subscription is in ASYNC fusion mode and is currently empty.
*/
static final int FUSED_ASYNC_EMPTY = 8;
/**
* Indicates this Subscription is in ASYNC fusion mode and has a value.
*/
static final int FUSED_ASYNC_READY = 16;
/**
* Indicates this Subscription is in ASYNC fusion mode and its value has been consumed.
*/
static final int FUSED_ASYNC_CONSUMED = 32;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ public class FluxDoOnEachTest {
void doOnEachAsyncFusionDoesntTriggerOnNextTwice() {
List<String> signals = new ArrayList<>();
StepVerifier.create(Flux.just("a", "b", "c")
.collectList()
.limitRate(3)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to arbitrary operator that satisfies .expectFusion(Fuseable.ASYNC)

.doOnEach(sig -> signals.add(sig.toString()))
)
.expectFusion(Fuseable.ASYNC)
.expectNext(Arrays.asList("a", "b", "c"))
.expectNext("a", "b", "c")
.verifyComplete();

assertThat(signals).containsExactly(
"doOnEach_onNext([a, b, c])",
"doOnEach_onNext(a)",
"doOnEach_onNext(b)",
"doOnEach_onNext(c)",
"onComplete()"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,6 @@ public void discardElementAndBufferOnAccumulatorLateFailure() {
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void discardElementAndBufferOnAccumulatorLateFailure_fused() {
Flux.just(1, 2, 3, 4)
.collect(ArrayList::new, (l, t) -> {
if (t == 3) {
throw new IllegalStateException("accumulator: boom");
}
l.add(t);
})
.as(StepVerifier::create)
//WARNING: we need to request fusion so this expectFusion is important
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("accumulator: boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void discardListElementsOnError() {
Mono<List<Integer>> test =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,53 +90,6 @@ public void onSuccessNormalConditional() {

}

@Test
public void onSuccessFusion() {
LongAdder invoked = new LongAdder();
AtomicBoolean hasNull = new AtomicBoolean();

Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doOnSuccess(v -> {
if (v == null) hasNull.set(true);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion(Fuseable.ASYNC)
.expectNext(55)
.expectComplete()
.verify();

assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
}

@Test
public void onSuccessFusionConditional() {
LongAdder invoked = new LongAdder();
AtomicBoolean hasNull = new AtomicBoolean();

Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.filter(v -> true)
.doOnSuccess(v -> {
if (v == null) hasNull.set(true);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();

assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
}

@Test
public void onSuccessOrErrorNormal() {
LongAdder invoked = new LongAdder();
Expand Down Expand Up @@ -194,61 +147,6 @@ public void onSuccessOrErrorNormalConditional() {
assertThat(error).as("unexpected error").hasValue(null);
}

@Test
public void onSuccessOrErrorFusion() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();

@SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();

assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
assertThat(error).as("unexpected error").hasValue(null);
}

@Test
public void onSuccessOrErrorFusionConditional() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();

@SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.filter(v -> true)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});

StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();

assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse();
assertThat(invoked.intValue()).isEqualTo(1);
assertThat(error).as("unexpected error").hasValue(null);
}

@Test
public void onAfterSuccessOrErrorNormal() {
LongAdder invoked = new LongAdder();
Expand Down Expand Up @@ -749,30 +647,6 @@ void testCallbacksFusionSync() {
assertThat(errorInvocation).hasValue(null);
}

@Test
void testCallbacksFusionAsync() {
AtomicReference<Integer> successInvocation = new AtomicReference<>();
AtomicReference<Throwable> errorInvocation = new AtomicReference<>();

Mono<Integer> source = Flux
.range(1, 10)
.reduce((a, b) -> a + b);

Mono<Integer> mono = new MonoPeekTerminal<>(source,
successInvocation::set,
errorInvocation::set,
null); //afterTerminate forces the negotiation of fusion mode NONE

StepVerifier.create(mono)
.expectFusion(Fuseable.ASYNC)
.expectNext(55)
.expectComplete()
.verify();

assertThat((Object) successInvocation.get()).isEqualTo(55);
assertThat(errorInvocation).hasValue(null);
}

@Test
public void should_reduce_to_10_events() {
for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,12 +41,12 @@ void normal() {
.publish(o -> o.flatMap(s -> Mono.just(2)));

StepVerifier.create(m)
.expectFusion()
.expectNoFusionSupport()
.expectNext(2)
.verifyComplete();

StepVerifier.create(m)
.expectFusion()
.expectNoFusionSupport()
.expectNext(2)
.verifyComplete();
}
Expand Down Expand Up @@ -139,24 +139,6 @@ void normalCancelBeforeComplete() {
assertThat(Mono.just(Mono.just(1).hide().publish(v -> v)).flatMapMany(v -> v).blockLast()).isEqualTo(1);
}

//see https://github.com/reactor/reactor-core/issues/2600
@Test
void errorFused() {
final String errorMessage = "Error in Mono";
final Mono<Object> source = Mono.error(new RuntimeException(errorMessage));
final Mono<Object> published = source.publish(coordinator -> coordinator.flatMap(Mono::just));

StepVerifier.create(published)
.expectFusion()
.expectErrorMessage(errorMessage)
.verify();

StepVerifier.create(published, StepVerifierOptions.create().scenarioName("second shared invocation"))
.expectFusion()
.expectErrorMessage(errorMessage)
.verify();
}

//see https://github.com/reactor/reactor-core/issues/2600
@Test
void errorHide() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() {
assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void queueSubscriptionAsyncRejected() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());

assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void clear() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());
Expand All @@ -50,7 +57,6 @@ public void clear() {

ds.clear();

assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED);
assertThat(ds.value).isNull();
}

Expand Down
Loading