Skip to content

Commit

Permalink
Add MonoFirstWithValue inflight cancellation test (#3915)
Browse files Browse the repository at this point in the history
The new test case validates that sources which are already subscribed to
are properly cancelled and only the first delivered value is propagated.
  • Loading branch information
subbarao authored Nov 7, 2024
1 parent 6692812 commit 3ba3cc1
Showing 1 changed file with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,11 +20,14 @@
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.Test;

import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -172,6 +175,35 @@ void cancelIsPropagated() {
pub2.assertMaxRequested(1);
pub2.assertWasCancelled();
}
@Test
void cancelInflightMono() {
AtomicLong cancelledInflightMonos = new AtomicLong(0);
CountDownLatch inflightMonoLatch = new CountDownLatch(1);
CountDownLatch completedMonoLatch = new CountDownLatch(1);
Mono<Integer> inflightMono1 = Mono.fromCallable(() -> {
inflightMonoLatch.await();
return 1;
}).doOnCancel(cancelledInflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic());

Mono<Integer> inflightMono2 = Mono.fromCallable(() -> {
inflightMonoLatch.await();
return 2;
}).doOnCancel(cancelledInflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic());

Mono<Integer> completedMono = Mono.fromCallable(() -> {
completedMonoLatch.await();
return 3;
}).doOnCancel(cancelledInflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic());

StepVerifier.create(Mono.firstWithValue(inflightMono1, inflightMono2, completedMono))
.then(() -> {
completedMonoLatch.countDown();
})
.expectNext(3)
.verifyComplete();

assertThat(cancelledInflightMonos).hasValue(2);
}

@Test
void singleArrayNullSource() {
Expand Down

0 comments on commit 3ba3cc1

Please sign in to comment.