From b70f94e6758d16aa0b9f285067ca18d5c689ca9e Mon Sep 17 00:00:00 2001 From: Ugnius Rumsevicius <33397860+UgiR@users.noreply.github.com> Date: Thu, 27 Oct 2022 11:54:00 -0400 Subject: [PATCH] Removing fusion from MonoSubscriber (#3245) Since Mono deals with at most one item, a notion of a queue is not justified. MonoSubscriber supported the notion of async fusion, which requires unnecessary operations to support it. This contributes to a performance degradation instead of improving it. Async fusion has been removed in 3.5.0 line as part of a rework of Mono stack to make them more lazy (compute result after request has been issued, instead of at subscription time). The asynchronous fusion aspect can be also removed as part of the 3.4.x release line to improve performance of existing usages, as it does not change the behaviour nor the APIs. Fixes #3239 --- .../reactor/core/publisher/Operators.java | 37 +---- .../core/publisher/FluxDoOnEachTest.java | 8 +- .../core/publisher/MonoCollectTest.java | 17 --- .../reactor/core/publisher/MonoCountTest.java | 3 +- .../core/publisher/MonoPeekAfterTest.java | 126 ------------------ .../publisher/MonoPublishMulticastTest.java | 24 +--- .../core/publisher/MonoSubscriberTest.java | 8 +- .../reactor/core/publisher/MonoUsingTest.java | 43 +----- .../publisher/MonoMetricsFuseableTest.java | 95 +------------ 9 files changed, 21 insertions(+), 340 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 18ec9c4979..9fd36ad980 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -1776,7 +1776,6 @@ public Object scanUnsafe(Attr key) { @Override public final void clear() { - STATE.lazySet(this, FUSED_ASYNC_CONSUMED); this.value = null; } @@ -1790,18 +1789,6 @@ public final void clear() { public final void complete(@Nullable O v) { for (; ; ) { int state = this.state; - if (state == FUSED_ASYNC_EMPTY) { - setValue(v); - //sync memory since setValue is non volatile - if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) { - Subscriber a = actual; - a.onNext(null); - a.onComplete(); - return; - } - //refresh state if race occurred so we test if cancelled in the next comparison - state = this.state; - } // if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return if ((state & ~HAS_REQUEST_NO_VALUE) != 0) { @@ -1850,7 +1837,7 @@ public final boolean isCancelled() { @Override public final boolean isEmpty() { - return this.state != FUSED_ASYNC_READY; + return true; } @Override @@ -1877,11 +1864,6 @@ public void onSubscribe(Subscription s) { @Override @Nullable public final O poll() { - if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) { - O v = value; - value = null; - return v; - } return null; } @@ -1917,10 +1899,6 @@ public void request(long n) { @Override public int requestFusion(int mode) { - if ((mode & ASYNC) != 0) { - STATE.lazySet(this, FUSED_ASYNC_EMPTY); - return ASYNC; - } return NONE; } @@ -1964,18 +1942,7 @@ public int size() { * Indicates the Subscription has been cancelled. */ static final int CANCELLED = 4; - /** - * Indicates this Subscription is in ASYNC fusion mode and is currently empty. - */ - static final int FUSED_ASYNC_EMPTY = 8; - /** - * Indicates this Subscription is in ASYNC fusion mode and has a value. - */ - static final int FUSED_ASYNC_READY = 16; - /** - * Indicates this Subscription is in ASYNC fusion mode and its value has been consumed. - */ - static final int FUSED_ASYNC_CONSUMED = 32; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state"); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java index 241dcbec0b..6abfca1918 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java @@ -58,15 +58,17 @@ public class FluxDoOnEachTest { void doOnEachAsyncFusionDoesntTriggerOnNextTwice() { List signals = new ArrayList<>(); StepVerifier.create(Flux.just("a", "b", "c") - .collectList() + .limitRate(3) .doOnEach(sig -> signals.add(sig.toString())) ) .expectFusion(Fuseable.ASYNC) - .expectNext(Arrays.asList("a", "b", "c")) + .expectNext("a", "b", "c") .verifyComplete(); assertThat(signals).containsExactly( - "doOnEach_onNext([a, b, c])", + "doOnEach_onNext(a)", + "doOnEach_onNext(b)", + "doOnEach_onNext(c)", "onComplete()" ); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java index 183d2862de..0c1cc688d7 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java @@ -221,23 +221,6 @@ public void discardElementAndBufferOnAccumulatorLateFailure() { .hasDiscardedExactly(1, 2, 3); } - @Test - public void discardElementAndBufferOnAccumulatorLateFailure_fused() { - Flux.just(1, 2, 3, 4) - .collect(ArrayList::new, (l, t) -> { - if (t == 3) { - throw new IllegalStateException("accumulator: boom"); - } - l.add(t); - }) - .as(StepVerifier::create) - //WARNING: we need to request fusion so this expectFusion is important - .expectFusion(Fuseable.ASYNC) - .expectErrorMessage("accumulator: boom") - .verifyThenAssertThat() - .hasDiscardedExactly(1, 2, 3); - } - @Test public void discardListElementsOnError() { Mono> test = diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java index 6c55573718..3a46dd0f31 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCountTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import static org.assertj.core.api.Assertions.assertThat; diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java index 8166af5e56..baba52b171 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java @@ -90,53 +90,6 @@ public void onSuccessNormalConditional() { } - @Test - public void onSuccessFusion() { - LongAdder invoked = new LongAdder(); - AtomicBoolean hasNull = new AtomicBoolean(); - - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .doOnSuccess(v -> { - if (v == null) hasNull.set(true); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion(Fuseable.ASYNC) - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - } - - @Test - public void onSuccessFusionConditional() { - LongAdder invoked = new LongAdder(); - AtomicBoolean hasNull = new AtomicBoolean(); - - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .filter(v -> true) - .doOnSuccess(v -> { - if (v == null) hasNull.set(true); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(hasNull.get()).as("unexpected call to onSuccess with null").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - } - @Test public void onSuccessOrErrorNormal() { LongAdder invoked = new LongAdder(); @@ -194,61 +147,6 @@ public void onSuccessOrErrorNormalConditional() { assertThat(error).as("unexpected error").hasValue(null); } - @Test - public void onSuccessOrErrorFusion() { - LongAdder invoked = new LongAdder(); - AtomicBoolean completedEmpty = new AtomicBoolean(); - AtomicReference error = new AtomicReference<>(); - - @SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0 - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .doOnSuccessOrError((v, t) -> { - if (v == null && t == null) completedEmpty.set(true); - if (t != null) error.set(t); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - assertThat(error).as("unexpected error").hasValue(null); - } - - @Test - public void onSuccessOrErrorFusionConditional() { - LongAdder invoked = new LongAdder(); - AtomicBoolean completedEmpty = new AtomicBoolean(); - AtomicReference error = new AtomicReference<>(); - - @SuppressWarnings("deprecation") // Because of doOnSuccessOrError, which will be removed in 3.5.0 - Mono mono = Flux - .range(1, 10) - .reduce((a, b) -> a + b) - .filter(v -> true) - .doOnSuccessOrError((v, t) -> { - if (v == null && t == null) completedEmpty.set(true); - if (t != null) error.set(t); - invoked.increment(); - }); - - StepVerifier.create(mono) - .expectFusion() - .expectNext(55) - .expectComplete() - .verify(); - - assertThat(completedEmpty.get()).as("unexpected empty completion").isFalse(); - assertThat(invoked.intValue()).isEqualTo(1); - assertThat(error).as("unexpected error").hasValue(null); - } - @Test public void onAfterSuccessOrErrorNormal() { LongAdder invoked = new LongAdder(); @@ -749,30 +647,6 @@ void testCallbacksFusionSync() { assertThat(errorInvocation).hasValue(null); } - @Test - void testCallbacksFusionAsync() { - AtomicReference successInvocation = new AtomicReference<>(); - AtomicReference errorInvocation = new AtomicReference<>(); - - Mono source = Flux - .range(1, 10) - .reduce((a, b) -> a + b); - - Mono mono = new MonoPeekTerminal<>(source, - successInvocation::set, - errorInvocation::set, - null); //afterTerminate forces the negotiation of fusion mode NONE - - StepVerifier.create(mono) - .expectFusion(Fuseable.ASYNC) - .expectNext(55) - .expectComplete() - .verify(); - - assertThat((Object) successInvocation.get()).isEqualTo(55); - assertThat(errorInvocation).hasValue(null); - } - @Test public void should_reduce_to_10_events() { for (int i = 0; i < 20; i++) { diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java index b4697274b1..b5d522f186 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishMulticastTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,12 +41,12 @@ void normal() { .publish(o -> o.flatMap(s -> Mono.just(2))); StepVerifier.create(m) - .expectFusion() + .expectNoFusionSupport() .expectNext(2) .verifyComplete(); StepVerifier.create(m) - .expectFusion() + .expectNoFusionSupport() .expectNext(2) .verifyComplete(); } @@ -139,24 +139,6 @@ void normalCancelBeforeComplete() { assertThat(Mono.just(Mono.just(1).hide().publish(v -> v)).flatMapMany(v -> v).blockLast()).isEqualTo(1); } - //see https://github.com/reactor/reactor-core/issues/2600 - @Test - void errorFused() { - final String errorMessage = "Error in Mono"; - final Mono source = Mono.error(new RuntimeException(errorMessage)); - final Mono published = source.publish(coordinator -> coordinator.flatMap(Mono::just)); - - StepVerifier.create(published) - .expectFusion() - .expectErrorMessage(errorMessage) - .verify(); - - StepVerifier.create(published, StepVerifierOptions.create().scenarioName("second shared invocation")) - .expectFusion() - .expectErrorMessage(errorMessage) - .verify(); - } - //see https://github.com/reactor/reactor-core/issues/2600 @Test void errorHide() { diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java index fd3ca1501f..c7a6318280 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSubscriberTest.java @@ -42,6 +42,13 @@ public void queueSubscriptionSyncRejected() { assertThat(ds.requestFusion(Fuseable.SYNC)).isEqualTo(Fuseable.NONE); } + @Test + public void queueSubscriptionAsyncRejected() { + MonoSubscriber ds = new MonoSubscriber<>(new AssertSubscriber<>()); + + assertThat(ds.requestFusion(Fuseable.ASYNC)).isEqualTo(Fuseable.NONE); + } + @Test public void clear() { MonoSubscriber ds = new MonoSubscriber<>(new AssertSubscriber<>()); @@ -50,7 +57,6 @@ public void clear() { ds.clear(); - assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED); assertThat(ds.value).isNull(); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java index de6d277e3b..c34a174f86 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoUsingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -337,25 +337,6 @@ public void smokeTestMapReduceGuardedByCleanup_normalEager() { assertThat(cleaned).isTrue(); } - @Test - public void smokeTestMapReduceGuardedByCleanup_fusedEager() { - AtomicBoolean cleaned = new AtomicBoolean(); - Mono.using(() -> cleaned, - ab -> Flux.just("foo", "bar", "baz") - .delayElements(Duration.ofMillis(100)) - .count() - .map(i -> "" + i + ab.get()), - ab -> ab.set(true), - true) - .as(StepVerifier::create) - .expectFusion() - .expectNext("3false") - .expectComplete() - .verify(); - - assertThat(cleaned).isTrue(); - } - @Test public void smokeTestMapReduceGuardedByCleanup_normalNotEager() { AtomicBoolean cleaned = new AtomicBoolean(); @@ -379,28 +360,6 @@ public void smokeTestMapReduceGuardedByCleanup_normalNotEager() { .untilAsserted(assertThat(cleaned)::isTrue); } - @Test - public void smokeTestMapReduceGuardedByCleanup_fusedNotEager() { - AtomicBoolean cleaned = new AtomicBoolean(); - Mono.using(() -> cleaned, - ab -> Flux.just("foo", "bar", "baz") - .delayElements(Duration.ofMillis(100)) - .count() - .map(i -> "" + i + ab.get()), - ab -> ab.set(true), - false) - .as(StepVerifier::create) - .expectFusion() - .expectNext("3false") - .expectComplete() - .verify(); - - //since the handler is executed after onComplete, we allow some delay - await().atMost(100, TimeUnit.MILLISECONDS) - .with().pollInterval(10, TimeUnit.MILLISECONDS) - .untilAsserted(assertThat(cleaned)::isTrue); - } - @Test public void scanOperator(){ MonoUsing test = new MonoUsing<>(() -> 1, r -> Mono.just(1), c -> {}, false); diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java index e7ef429661..bebe89d11b 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/MonoMetricsFuseableTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -327,32 +327,6 @@ public void completeEmptyNoFusion() { .satisfies(timer -> assertThat(timer.count()).as("timer count").isOne()); } - @Test - public void completeEmptyAsyncFusion() { - Mono source = Mono.fromCallable(() -> null); - - StepVerifier.create(new MonoMetricsFuseable<>(source)) - .expectFusion(Fuseable.ASYNC) - .verifyComplete(); - - Timer stcCompleteCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE)) - .timer(); - - Timer stcCompleteEmptyCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE_EMPTY)) - .timer(); - - assertThat(stcCompleteCounter) - .as("complete with element") - .isNull(); - - assertThat(stcCompleteEmptyCounter) - .as("complete without any element") - .isNotNull() - .satisfies(timer -> assertThat(timer.count()).as("timer count").isOne()); - } - @Test public void completeEmptySyncFusion() { MonoMetricsFuseable.MetricsFuseableSubscriber subscriber = @@ -409,33 +383,6 @@ public void completeWithElementNoFusion() { .isNull(); } - @Test - public void completeWithElementAsyncFusion() { - Mono source = Mono.fromCallable(() -> 1); - - StepVerifier.create(new MonoMetricsFuseable<>(source)) - .expectFusion(Fuseable.ASYNC) - .expectNext(1) - .verifyComplete(); - - Timer stcCompleteCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE)) - .timer(); - - Timer stcCompleteEmptyCounter = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE_EMPTY)) - .timer(); - - assertThat(stcCompleteCounter) - .as("complete with element") - .isNotNull() - .satisfies(timer -> assertThat(timer.count()).as("timer count").isOne()); - - assertThat(stcCompleteEmptyCounter) - .as("complete without any element") - .isNull(); - } - @Test public void completeWithElementSyncFusion() { Mono source = Mono.just(1); @@ -463,46 +410,6 @@ public void completeWithElementSyncFusion() { .isNull(); } - @Test - public void subscribeToCompleteFuseable() { - Mono source = Mono.fromCallable(() -> { - Thread.sleep(100); - return "foo"; - }); - - StepVerifier.create(new MonoMetricsFuseable<>(source)) - .expectFusion(Fuseable.ASYNC) - .expectNext("foo") - .verifyComplete(); - - - Timer stcCompleteTimer = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_COMPLETE)) - .timer(); - - Timer stcErrorTimer = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_ON_ERROR)) - .timer(); - - Timer stcCancelTimer = registry.find(REACTOR_DEFAULT_NAME + METER_FLOW_DURATION) - .tags(Tags.of(TAG_CANCEL)) - .timer(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(stcCompleteTimer.max(TimeUnit.MILLISECONDS)) - .as("subscribe to complete timer") - .isGreaterThanOrEqualTo(100); - - softly.assertThat(stcErrorTimer) - .as("subscribe to error timer lazily registered") - .isNull(); - - softly.assertThat(stcCancelTimer) - .as("subscribe to cancel timer") - .isNull(); - }); - } - @Test public void subscribeToErrorFuseable() { Mono source = Mono.delay(Duration.ofMillis(100))