diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 18ec9c4979..9fd36ad980 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) { @Override public final void clear() { - STATE.lazySet(this, FUSED_ASYNC_CONSUMED); this.value = null; } @@ -1790,18 +1789,6 @@ public final void clear() { public final void complete(@Nullable O v) { for (; ; ) { int state = this.state; - if (state == FUSED_ASYNC_EMPTY) { - setValue(v); - //sync memory since setValue is non volatile - if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) { - Subscriber a = actual; - a.onNext(null); - a.onComplete(); - return; - } - //refresh state if race occurred so we test if cancelled in the next comparison - state = this.state; - } // if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return if ((state & ~HAS_REQUEST_NO_VALUE) != 0) { @@ -1850,7 +1837,7 @@ public final boolean isCancelled() { @Override public final boolean isEmpty() { - return this.state != FUSED_ASYNC_READY; + return true; } @Override @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) { @Override @Nullable public final O poll() { - if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) { - O v = value; - value = null; - return v; - } return null; } @@ -1917,10 +1899,6 @@ public void request(long n) { @Override public int requestFusion(int mode) { - if ((mode & ASYNC) != 0) { - STATE.lazySet(this, FUSED_ASYNC_EMPTY); - return ASYNC; - } return NONE; } @@ -1964,18 +1942,7 @@ public int size() { * Indicates the Subscription has been cancelled. */ static final int CANCELLED = 4; - /** - * Indicates this Subscription is in ASYNC fusion mode and is currently empty. - */ - static final int FUSED_ASYNC_EMPTY = 8; - /** - * Indicates this Subscription is in ASYNC fusion mode and has a value. - */ - static final int FUSED_ASYNC_READY = 16; - /** - * Indicates this Subscription is in ASYNC fusion mode and its value has been consumed. - */ - static final int FUSED_ASYNC_CONSUMED = 32; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state"); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java index 241dcbec0b..6abfca1918 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java @@ -58,15 +58,17 @@ public class FluxDoOnEachTest { void doOnEachAsyncFusionDoesntTriggerOnNextTwice() { List signals = new ArrayList<>(); StepVerifier.create(Flux.just("a", "b", "c") - .collectList() + .limitRate(3) .doOnEach(sig -> signals.add(sig.toString())) ) .expectFusion(Fuseable.ASYNC) - .expectNext(Arrays.asList("a", "b", "c")) + .expectNext("a", "b", "c") .verifyComplete(); assertThat(signals).containsExactly( - "doOnEach_onNext([a, b, c])", + "doOnEach_onNext(a)", + "doOnEach_onNext(b)", + "doOnEach_onNext(c)", "onComplete()" ); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java index 183d2862de..0c1cc688d7 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java @@ -221,23 +221,6 @@ public void discardElementAndBufferOnAccumulatorLateFailure() { .hasDiscardedExactly(1, 2, 3); } - @Test - public void discardElementAndBufferOnAccumulatorLateFailure_fused() { - Flux.just(1, 2, 3, 4) - .collect(ArrayList::new, (l, t) -> { - if (t == 3) { - throw new IllegalStateException("accumulator: boom"); - } - l.add(t); - }) - .as(StepVerifier::create) - //WARNING: we need to request fusion so this expectFusion is important - .expectFusion(Fuseable.ASYNC) - .expectErrorMessage("accumulator: boom") - .verifyThenAssertThat() - .hasDiscardedExactly(1, 2, 3); - } - @Test public void discardListElementsOnError() { Mono> test = diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java index 6c55573718..3a46dd0f31 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import static org.assertj.core.api.Assertions.assertThat; diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java index 8166af5e56..baba52b171 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java @@ -90,53 +90,6 @@ public void onSuccessNormalConditional() { } - @Test - public void onSuccessFusion() { - LongAdder invoked = new LongAdder(); - AtomicBoolean hasNull = new AtomicBoolean(); - - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .doOnSuccess(v -> { - if (v == null) hasNull.set(true); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion(Fuseable.ASYNC) - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - } - - @Test - public void onSuccessFusionConditional() { - LongAdder invoked = new LongAdder(); - AtomicBoolean hasNull = new AtomicBoolean(); - - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .filter(v -> true) - .doOnSuccess(v -> { - if (v == null) hasNull.set(true); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - } - @Test public void onSuccessOrErrorNormal() { LongAdder invoked = new LongAdder(); @@ -194,61 +147,6 @@ public void onSuccessOrErrorNormalConditional() { assertThat(error).as("unexpected error").hasValue(null); } - @Test - public void onSuccessOrErrorFusion() { - LongAdder invoked = new LongAdder(); - AtomicBoolean completedEmpty = new AtomicBoolean(); - AtomicReference error = new AtomicReference<>(); - - @SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0 - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .doOnSuccessOrError((v, t) -> { - if (v == null && t == null) completedEmpty.set(true); - if (t != null) error.set(t); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - assertThat(error).as("unexpected error").hasValue(null); - } - - @Test - public void onSuccessOrErrorFusionConditional() { - LongAdder invoked = new LongAdder(); - AtomicBoolean completedEmpty = new AtomicBoolean(); - AtomicReference error = new AtomicReference<>(); - - @SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0 - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .filter(v -> true) - .doOnSuccessOrError((v, t) -> { - if (v == null && t == null) completedEmpty.set(true); - if (t != null) error.set(t); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - assertThat(error).as("unexpected error").hasValue(null); - } - @Test public void onAfterSuccessOrErrorNormal() { LongAdder invoked = new LongAdder(); @@ -749,30 +647,6 @@ void testCallbacksFusionSync() { assertThat(errorInvocation).hasValue(null); } - @Test - void testCallbacksFusionAsync() { - AtomicReference successInvocation = new AtomicReference<>(); - AtomicReference errorInvocation = new AtomicReference<>(); - - Mono source = Flux - .range(1, 10) - .reduce((a, b) -> a + b); - - Mono mono = new MonoPeekTerminal<>(source, - successInvocation::set, - errorInvocation::set, - null); //afterTerminate forces the negotiation of fusion mode NONE - - StepVerifier.create(mono) - .expectFusion(Fuseable.ASYNC) - .expectNext(55) - .expectComplete() - .verify(); - - assertThat((Object) successInvocation.get()).isEqualTo(55); - assertThat(errorInvocation).hasValue(null); - } - @Test public void should_reduce_to_10_events() { for (int i = 0; i < 20; i++) { diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java index b4697274b1..b5d522f186 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,12 +41,12 @@ void normal() { .publish(o -> o.flatMap(s -> Mono.just(2))); StepVerifier.create(m) - .expectFusion() + .expectNoFusionSupport() .expectNext(2) .verifyComplete(); StepVerifier.create(m) - .expectFusion() + .expectNoFusionSupport() .expectNext(2) .verifyComplete(); } @@ -139,24 +139,6 @@ void normalCancelBeforeComplete() { assertThat(Mono.just(Mono.just(1).hide().publish(v -> v)).flatMapMany(v -> v).blockLast()).isEqualTo(1); } - //see https://github.com/reactor/reactor-core/issues/2600 - @Test - void errorFused() { - final String errorMessage = "Error in Mono"; - final Mono source = Mono.error(new RuntimeException(errorMessage)); - final Mono published = source.publish(coordinator -> coordinator.flatMap(Mono::just)); - - StepVerifier.create(published) - .expectFusion() - .expectErrorMessage(errorMessage) - .verify(); - - StepVerifier.create(published, StepVerifierOptions.create().scenarioName("second shared invocation")) - .expectFusion() - .expectErrorMessage(errorMessage) - .verify(); - } - //see https://github.com/reactor/reactor-core/issues/2600 @Test void errorHide() { diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java index fd3ca1501f..c7a6318280 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() { assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE); } + @Test + public void queueSubscriptionAsyncRejected() { + MonoSubscriber ds = new MonoSubscriber<>(new AssertSubscriber<>()); + + assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE); + } + @Test public void clear() { MonoSubscriber ds = new MonoSubscriber<>(new AssertSubscriber<>()); @@ -50,7 +57,6 @@ public void clear() { ds.clear(); - assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED); assertThat(ds.value).isNull(); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java index de6d277e3b..c34a174f86 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -337,25 +337,6 @@ public void smokeTestMapReduceGuardedByCleanup_normalEager() { assertThat(cleaned).isTrue(); } - @Test - public void smokeTestMapReduceGuardedByCleanup_fusedEager() { - AtomicBoolean cleaned = new AtomicBoolean(); - Mono.using(() -> cleaned, - ab -> Flux.just("foo", "bar", "baz") - .delayElements(Duration.ofMillis(100)) - .count() - .map(i -> "" + i + ab.get()), - ab -> ab.set(true), - true) - .as(StepVerifier::create) - .expectFusion() - .expectNext("3false") - .expectComplete() - .verify(); - - assertThat(cleaned).isTrue(); - } - @Test public void smokeTestMapReduceGuardedByCleanup_normalNotEager() { AtomicBoolean cleaned = new AtomicBoolean(); @@ -379,28 +360,6 @@ public void smokeTestMapReduceGuardedByCleanup_normalNotEager() { .untilAsserted(assertThat(cleaned)::isTrue); } - @Test - public void smokeTestMapReduceGuardedByCleanup_fusedNotEager() { - AtomicBoolean cleaned = new AtomicBoolean(); - Mono.using(() -> cleaned, - ab -> Flux.just("foo", "bar", "baz") - .delayElements(Duration.ofMillis(100)) - .count() - .map(i -> "" + i + ab.get()), - ab -> ab.set(true), - false) - .as(StepVerifier::create) - .expectFusion() - .expectNext("3false") - .expectComplete() - .verify(); - - //since the handler is executed after onComplete, we allow some delay - await().atMost(100, TimeUnit.MILLISECONDS) - .with().pollInterval(10, TimeUnit.MILLISECONDS) - .untilAsserted(assertThat(cleaned)::isTrue); - } - @Test public void scanOperator(){ MonoUsing test = new MonoUsing<>(() -> 1, r -> Mono.just(1), c -> {}, false); diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java index e7ef429661..bebe89d11b 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -327,32 +327,6 @@ public void completeEmptyNoFusion() { .satisfies(timer -> assertThat(timer.count()).as("timer count").isOne()); } - @Test - public void completeEmptyAsyncFusion() { - Mono source = Mono.fromCallable(() -> null); - - StepVerifier.create(new MonoMetricsFuseable<>(source)) - .expectFusion(Fuseable.ASYNC) - .verifyComplete(); - - Timer stcCompleteCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE)) - .timer(); - - Timer stcCompleteEmptyCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE_EMPTY)) - .timer(); - - assertThat(stcCompleteCounter) - .as("complete with element") - .isNull(); - - assertThat(stcCompleteEmptyCounter) - .as("complete without any element") - .isNotNull() - .satisfies(timer -> assertThat(timer.count()).as("timer count").isOne()); - } - @Test public void completeEmptySyncFusion() { MonoMetricsFuseable.MetricsFuseableSubscriber subscriber = @@ -409,33 +383,6 @@ public void completeWithElementNoFusion() { .isNull(); } - @Test - public void completeWithElementAsyncFusion() { - Mono source = Mono.fromCallable(() -> 1); - - StepVerifier.create(new MonoMetricsFuseable<>(source)) - .expectFusion(Fuseable.ASYNC) - .expectNext(1) - .verifyComplete(); - - Timer stcCompleteCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE)) - .timer(); - - Timer stcCompleteEmptyCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE_EMPTY)) - .timer(); - - assertThat(stcCompleteCounter) - .as("complete with element") - .isNotNull() - .satisfies(timer -> assertThat(timer.count()).as("timer count").isOne()); - - assertThat(stcCompleteEmptyCounter) - .as("complete without any element") - .isNull(); - } - @Test public void completeWithElementSyncFusion() { Mono source = Mono.just(1); @@ -463,46 +410,6 @@ public void completeWithElementSyncFusion() { .isNull(); } - @Test - public void subscribeToCompleteFuseable() { - Mono source = Mono.fromCallable(() -> { - Thread.sleep(100); - return "foo"; - }); - - StepVerifier.create(new MonoMetricsFuseable<>(source)) - .expectFusion(Fuseable.ASYNC) - .expectNext("foo") - .verifyComplete(); - - - Timer stcCompleteTimer = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE)) - .timer(); - - Timer stcErrorTimer = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_ERROR)) - .timer(); - - Timer stcCancelTimer = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_CANCEL)) - .timer(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(stcCompleteTimer.max(TimeUnit.MILLISECONDS)) - .as("subscribe to complete timer") - .isGreaterThanOrEqualTo(100); - - softly.assertThat(stcErrorTimer) - .as("subscribe to error timer lazily registered") - .isNull(); - - softly.assertThat(stcCancelTimer) - .as("subscribe to cancel timer") - .isNull(); - }); - } - @Test public void subscribeToErrorFuseable() { Mono source = Mono.delay(Duration.ofMillis(100))