Skip to content

Commit

Permalink
Prevent FluxBufferTimeout requests < 0 when enough outstanding (#2892)
Browse files Browse the repository at this point in the history
This commit puts a simple guard against negative request in the
BufferTimeoutSubscriber#request method in order to cover cases
where discrepancies are introduced by the timeouts.

Specifically, this covers a scenario where delivery from upstream
is partial then times out, while downstream requests a small number
of buffers.

For instance:
  - with buffer size 10, let's assume a first request of 2 buffers
  - let's also assume the source is slow to emit at first and delivers
2 elements, then times out => first buffer request is met with timeout
  - then it emits one more element and stalls again => second buffer
request is met with timeout also
  - `request(1)` comes in from downstream

At this point we have `outstanding` at `2x10 - 3` = `17`.
A request for one buffer translates to `requestLimit = 10`.
We have sufficient outstanding request to cover for that buffer and
in `requestMore(n)` where `n = requestLimit - outstanding`, `n`
is negative. => this commit prevents that negative request.

Fixes #2839.
  • Loading branch information
simonbasle authored Feb 11, 2022
1 parent f7d2bac commit 6f33aa7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -314,7 +314,9 @@ public void request(long n) {
}
else {
long requestLimit = Operators.multiplyCap(requested, batchSize);
requestMore(requestLimit - outstanding);
if (requestLimit > outstanding) {
requestMore(requestLimit - outstanding);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -109,6 +111,31 @@ public void bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLo
);
}

@Test
void bufferWithTimeoutAvoidingNegativeRequests() {
final List<Long> requestPattern = new CopyOnWriteArrayList<>();

StepVerifier.withVirtualTime(() ->
Flux.range(1, 3)
.delayElements(Duration.ofMillis(100))
.doOnRequest(requestPattern::add)
.bufferTimeout(5, Duration.ofMillis(100)),
0)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(100))
.thenRequest(2)
.expectNoEvent(Duration.ofMillis(100))
.assertNext(s -> assertThat(s).containsExactly(1))
.expectNoEvent(Duration.ofMillis(100))
.assertNext(s -> assertThat(s).containsExactly(2))
.thenRequest(1) // This should not cause a negative upstream request
.expectNoEvent(Duration.ofMillis(100))
.thenCancel()
.verify();

assertThat(requestPattern).allSatisfy(r -> assertThat(r).isPositive());
}

@Test
public void scanSubscriber() {
CoreSubscriber<List<String>> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
Expand Down

0 comments on commit 6f33aa7

Please sign in to comment.