Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable fusion in MonoSubscriber #3245

Merged
merged 5 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) {

@Override
public final void clear() {
STATE.lazySet(this, FUSED_ASYNC_CONSUMED);
this.value = null;
}

Expand All @@ -1790,18 +1789,6 @@ public final void clear() {
public final void complete(@Nullable O v) {
for (; ; ) {
int state = this.state;
if (state == FUSED_ASYNC_EMPTY) {
setValue(v);
//sync memory since setValue is non volatile
if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) {
Subscriber<? super O> a = actual;
a.onNext(null);
a.onComplete();
return;
}
//refresh state if race occurred so we test if cancelled in the next comparison
state = this.state;
}

// if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
Expand Down Expand Up @@ -1850,7 +1837,7 @@ public final boolean isCancelled() {

@Override
public final boolean isEmpty() {
return this.state != FUSED_ASYNC_READY;
return true;
}

@Override
Expand All @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) {
@Override
@Nullable
public final O poll() {
if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) {
O v = value;
value = null;
return v;
}
return null;
}

Expand Down Expand Up @@ -1917,10 +1899,6 @@ public void request(long n) {

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
STATE.lazySet(this, FUSED_ASYNC_EMPTY);
return ASYNC;
}
return NONE;
}

Expand Down Expand Up @@ -1964,18 +1942,7 @@ public int size() {
* Indicates the Subscription has been cancelled.
*/
static final int CANCELLED = 4;
/**
* Indicates this Subscription is in ASYNC fusion mode and is currently empty.
*/
static final int FUSED_ASYNC_EMPTY = 8;
/**
* Indicates this Subscription is in ASYNC fusion mode and has a value.
*/
static final int FUSED_ASYNC_READY = 16;
/**
* Indicates this Subscription is in ASYNC fusion mode and its value has been consumed.
*/
static final int FUSED_ASYNC_CONSUMED = 32;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ public class FluxDoOnEachTest {
void doOnEachAsyncFusionDoesntTriggerOnNextTwice() {
List<String> signals = new ArrayList<>();
StepVerifier.create(Flux.just("a", "b", "c")
.collectList()
.limitRate(3)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to arbitrary operator that satisfies .expectFusion(Fuseable.ASYNC)

.doOnEach(sig -> signals.add(sig.toString()))
)
.expectFusion(Fuseable.ASYNC)
.expectNext(Arrays.asList("a", "b", "c"))
.expectNext("a", "b", "c")
.verifyComplete();

assertThat(signals).containsExactly(
"doOnEach_onNext([a, b, c])",
"doOnEach_onNext(a)",
"doOnEach_onNext(b)",
"doOnEach_onNext(c)",
"onComplete()"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() {
assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void queueSubscriptionAsyncRejected() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());

assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void clear() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());
Expand All @@ -50,7 +57,6 @@ public void clear() {

ds.clear();

assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED);
assertThat(ds.value).isNull();
}

Expand Down