diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 61365fd0f5..89c606a1ed 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) { @Override public final void clear() { - STATE.lazySet(this, FUSED_ASYNC_CONSUMED); this.value = null; } @@ -1790,18 +1789,6 @@ public final void clear() { public final void complete(@Nullable O v) { for (; ; ) { int state = this.state; - if (state == FUSED_ASYNC_EMPTY) { - setValue(v); - //sync memory since setValue is non volatile - if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) { - Subscriber a = actual; - a.onNext(null); - a.onComplete(); - return; - } - //refresh state if race occurred so we test if cancelled in the next comparison - state = this.state; - } // if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return if ((state & ~HAS_REQUEST_NO_VALUE) != 0) { @@ -1850,7 +1837,7 @@ public final boolean isCancelled() { @Override public final boolean isEmpty() { - return this.state != FUSED_ASYNC_READY; + return true; } @Override @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) { @Override @Nullable public final O poll() { - if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) { - O v = value; - value = null; - return v; - } return null; } @@ -1917,10 +1899,6 @@ public void request(long n) { @Override public int requestFusion(int mode) { - if ((mode & ASYNC) != 0) { - STATE.lazySet(this, FUSED_ASYNC_EMPTY); - return ASYNC; - } return NONE; } @@ -1964,18 +1942,7 @@ public int size() { * Indicates the Subscription has been cancelled. */ static final int CANCELLED = 4; - /** - * Indicates this Subscription is in ASYNC fusion mode and is currently empty. - */ - static final int FUSED_ASYNC_EMPTY = 8; - /** - * Indicates this Subscription is in ASYNC fusion mode and has a value. - */ - static final int FUSED_ASYNC_READY = 16; - /** - * Indicates this Subscription is in ASYNC fusion mode and its value has been consumed. - */ - static final int FUSED_ASYNC_CONSUMED = 32; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state"); diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java index 0ef4eefa25..97418ee5ae 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java @@ -20,6 +20,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import static org.assertj.core.api.Assertions.assertThat; diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java index fd3ca1501f..c7a6318280 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() { assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE); } + @Test + public void queueSubscriptionAsyncRejected() { + MonoSubscriber ds = new MonoSubscriber<>(new AssertSubscriber<>()); + + assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE); + } + @Test public void clear() { MonoSubscriber ds = new MonoSubscriber<>(new AssertSubscriber<>()); @@ -50,7 +57,6 @@ public void clear() { ds.clear(); - assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED); assertThat(ds.value).isNull(); }