Skip to content

Commit

Permalink
Merge #3245 into 3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Oct 27, 2022
2 parents 5a8a74e + b70f94e commit 17ac48a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 36 deletions.
37 changes: 2 additions & 35 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) {

@Override
public final void clear() {
STATE.lazySet(this, FUSED_ASYNC_CONSUMED);
this.value = null;
}

Expand All @@ -1790,18 +1789,6 @@ public final void clear() {
public final void complete(@Nullable O v) {
for (; ; ) {
int state = this.state;
if (state == FUSED_ASYNC_EMPTY) {
setValue(v);
//sync memory since setValue is non volatile
if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) {
Subscriber<? super O> a = actual;
a.onNext(null);
a.onComplete();
return;
}
//refresh state if race occurred so we test if cancelled in the next comparison
state = this.state;
}

// if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
Expand Down Expand Up @@ -1850,7 +1837,7 @@ public final boolean isCancelled() {

@Override
public final boolean isEmpty() {
return this.state != FUSED_ASYNC_READY;
return true;
}

@Override
Expand All @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) {
@Override
@Nullable
public final O poll() {
if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) {
O v = value;
value = null;
return v;
}
return null;
}

Expand Down Expand Up @@ -1917,10 +1899,6 @@ public void request(long n) {

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
STATE.lazySet(this, FUSED_ASYNC_EMPTY);
return ASYNC;
}
return NONE;
}

Expand Down Expand Up @@ -1964,18 +1942,7 @@ public int size() {
* Indicates the Subscription has been cancelled.
*/
static final int CANCELLED = 4;
/**
* Indicates this Subscription is in ASYNC fusion mode and is currently empty.
*/
static final int FUSED_ASYNC_EMPTY = 8;
/**
* Indicates this Subscription is in ASYNC fusion mode and has a value.
*/
static final int FUSED_ASYNC_READY = 16;
/**
* Indicates this Subscription is in ASYNC fusion mode and its value has been consumed.
*/
static final int FUSED_ASYNC_CONSUMED = 32;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() {
assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void queueSubscriptionAsyncRejected() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());

assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE);
}

@Test
public void clear() {
MonoSubscriber<Integer, Integer> ds = new MonoSubscriber<>(new AssertSubscriber<>());
Expand All @@ -50,7 +57,6 @@ public void clear() {

ds.clear();

assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED);
assertThat(ds.value).isNull();
}

Expand Down

0 comments on commit 17ac48a

Please sign in to comment.