Skip to content

Commit

Permalink
ResponseTimeoutMode.FROM_START works correctly with RetryingClient (
Browse files Browse the repository at this point in the history
#6025)

Motivation:

A bug was reported that `ResponseTimeoutMode.FROM_START` does not work
correctly when used with a `RetryingClient`. The cause was because how
the `responseTimeout` is calculated for `RetryingClient`.

`RetryingClient` bounds `responseTimeout` by computing the
`responseTimeout` on each iteration from its internal `State`.

https://github.com/line/armeria/blob/fa76e99fa6132545df3a8d05eeb81c5681ec8953/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java#L188

If the `CancellationScheduler` has not been started yet, the set timeout
is returned as-is via `CancellationScheduler#timeoutNanos` and is set at
for the derived ctx.

https://github.com/line/armeria/blob/fa76e99fa6132545df3a8d05eeb81c5681ec8953/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java#L543-L544

However, `CancellationScheduler#timeoutNanos` defines its contract as
returning the `timeoutNanos` if not started, and returning
`timeoutNanos` since the `startTime` if already started.

https://github.com/line/armeria/blob/fa76e99fa6132545df3a8d05eeb81c5681ec8953/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java#L104-L108

Hence, `CancellationScheduler#setTimeoutNanos` tries to set the time
remaining, but `CancellationScheduler#timeoutNanos` will return the
timeout since `CancellationScheduler#start` is called.

Since the semantics of `CancellationScheduler#timeoutNanos` has value in
retaining the originally set value, I propose that a new
`CancellationScheduler#remainingTimeoutNanos` is introduced which
returns the remaining timeout if a scheduler has been started.

Modifications:

- Introduced `CancellationScheduler#remainingTimeoutNanos` which returns
the remaining `responseTimeout` in nanos.
- Replaced `ClientRequestContext#responseTimeoutMillis` with
`ClientRequestContextExtension#remainingTimeoutNanos` in
`ArmeriaClientCall` and `DefaultClientRequestContext`
- Removed unneeded usages of
`ClientRequestContext#responseTimeoutMillis` in `HttpResponseWrapper`

Result:

- `ResponseTimeoutMode.FROM_START` correctly bounds requests that go
through `RetryingClient`
  • Loading branch information
jrhee17 authored Dec 11, 2024
1 parent e3248a1 commit 244e5cb
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requ
int id, DecodedHttpResponse res,
ClientRequestContext ctx, EventLoop eventLoop) {
final HttpResponseWrapper newRes =
new HttpResponseWrapper(requestHandler, res, eventLoop, ctx,
ctx.responseTimeoutMillis(), ctx.maxResponseLength());
new HttpResponseWrapper(requestHandler, res, eventLoop, ctx, ctx.maxResponseLength());
final HttpResponseWrapper oldRes = responses.put(id, newRes);
keepAliveHandler().increaseNumRequests();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,21 @@ class HttpResponseWrapper implements StreamWriter<HttpObject> {
private final EventLoop eventLoop;
private final ClientRequestContext ctx;
private final long maxContentLength;
private final long responseTimeoutMillis;

private boolean responseStarted;
private long contentLengthHeaderValue = -1;

private boolean done;
private boolean closed;

HttpResponseWrapper(@Nullable AbstractHttpRequestHandler requestHandler,
DecodedHttpResponse delegate, EventLoop eventLoop, ClientRequestContext ctx,
long responseTimeoutMillis, long maxContentLength) {
HttpResponseWrapper(@Nullable AbstractHttpRequestHandler requestHandler, DecodedHttpResponse delegate,
EventLoop eventLoop, ClientRequestContext ctx, long maxContentLength) {

this.requestHandler = requestHandler;
this.delegate = delegate;
this.eventLoop = eventLoop;
this.ctx = ctx;
this.maxContentLength = maxContentLength;
this.responseTimeoutMillis = responseTimeoutMillis;
}

void handle100Continue(ResponseHeaders responseHeaders) {
Expand Down Expand Up @@ -327,7 +325,6 @@ public String toString() {
.add("eventLoop", eventLoop)
.add("responseStarted", responseStarted)
.add("maxContentLength", maxContentLength)
.add("responseTimeoutMillis", responseTimeoutMillis)
.add("contentLengthHeaderValue", contentLengthHeaderValue)
.add("delegate", delegate)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requ
int id, DecodedHttpResponse decodedHttpResponse,
ClientRequestContext ctx, EventLoop eventLoop) {
assert res == null;
res = new WebSocketHttp1ResponseWrapper(decodedHttpResponse, eventLoop, ctx,
ctx.responseTimeoutMillis(), ctx.maxResponseLength());
res = new WebSocketHttp1ResponseWrapper(decodedHttpResponse, eventLoop, ctx, ctx.maxResponseLength());
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
final class WebSocketHttp1ResponseWrapper extends HttpResponseWrapper {

WebSocketHttp1ResponseWrapper(DecodedHttpResponse delegate,
EventLoop eventLoop, ClientRequestContext ctx,
long responseTimeoutMillis, long maxContentLength) {
super(null, delegate, eventLoop, ctx, responseTimeoutMillis, maxContentLength);
EventLoop eventLoop, ClientRequestContext ctx, long maxContentLength) {
super(null, delegate, eventLoop, ctx, maxContentLength);
WebSocketClientUtil.setClosingResponseTask(ctx, cause -> {
super.close(cause, false);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public interface ClientRequestContextExtension extends ClientRequestContext, Req
* with default values on every request.
*/
HttpHeaders internalRequestHeaders();

long remainingTimeoutNanos();
}
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx,
log.startRequest();
// Cancel the original timeout and create a new scheduler for the derived context.
ctx.responseCancellationScheduler.cancelScheduled();
responseCancellationScheduler =
CancellationScheduler.ofClient(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis()));
responseCancellationScheduler = CancellationScheduler.ofClient(ctx.remainingTimeoutNanos());
writeTimeoutMillis = ctx.writeTimeoutMillis();
maxResponseLength = ctx.maxResponseLength();

Expand Down Expand Up @@ -898,6 +897,11 @@ public HttpHeaders internalRequestHeaders() {
return internalRequestHeaders;
}

@Override
public long remainingTimeoutNanos() {
return responseCancellationScheduler().remainingTimeoutNanos();
}

@Override
public void setAdditionalRequestHeader(CharSequence name, Object value) {
requireNonNull(name, "name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ default void finishNow() {
*/
long timeoutNanos();

/**
* Before the scheduler has started, the configured timeout will be returned regardless of the
* {@link TimeoutMode}. If the scheduler has already started, the remaining time will be returned.
*/
long remainingTimeoutNanos();

long startTimeNanos();

CompletableFuture<Throwable> whenCancelling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ public void start() {
if (state != State.INIT) {
return;
}
state = State.SCHEDULED;
startTimeNanos = ticker.read();
if (timeoutMode == TimeoutMode.SET_FROM_NOW) {
final long elapsedTimeNanos = startTimeNanos - setFromNowStartNanos;
timeoutNanos = Long.max(LongMath.saturatedSubtract(timeoutNanos, elapsedTimeNanos), 0);
}

// set the state after all timeout related fields are updated
state = State.SCHEDULED;
if (timeoutNanos != Long.MAX_VALUE) {
scheduledFuture = eventLoop().schedule(() -> invokeTask(null), timeoutNanos, NANOSECONDS);
}
Expand Down Expand Up @@ -292,6 +294,23 @@ public long timeoutNanos() {
return timeoutNanos == Long.MAX_VALUE ? 0 : timeoutNanos;
}

@Override
public long remainingTimeoutNanos() {
lock.lock();
try {
if (timeoutNanos == Long.MAX_VALUE) {
return 0;
}
if (!isStarted()) {
return timeoutNanos;
}
final long elapsed = ticker.read() - startTimeNanos;
return Math.max(1, LongMath.saturatedSubtract(timeoutNanos, elapsed));
} finally {
lock.unlock();
}
}

@Override
public long startTimeNanos() {
return startTimeNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public long timeoutNanos() {
return 0;
}

@Override
public long remainingTimeoutNanos() {
return 0;
}

@Override
public long startTimeNanos() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client.retry;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;

import java.time.Duration;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.QueryParams;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

class ResponseTimeoutFromStartTest {

private static final Logger logger = LoggerFactory.getLogger(ResponseTimeoutFromStartTest.class);

@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.service("/", (ctx, req) -> {
final String delayMillisStr = ctx.queryParam("delayMillis");
assertThat(delayMillisStr).isNotNull();
final int delayMillis = Integer.parseInt(delayMillisStr);
return HttpResponse.delayed(HttpResponse.of(500), Duration.ofMillis(delayMillis));
});
}
};

@ParameterizedTest
@CsvSource({
"0,2500,2000",
"0,1750,2000",
"5000,1500,2000",
})
void originalResponseTimeoutRespected(long backoffMillis, long attemptMillis, long delayMillis) {
final long timeoutSeconds = 3;
final WebClient webClient =
WebClient.builder(server.httpUri())
.responseTimeout(Duration.ofSeconds(timeoutSeconds))
.responseTimeoutMode(ResponseTimeoutMode.FROM_START)
.decorator(
RetryingClient.builder(RetryRule.builder()
.onException()
.onServerErrorStatus()
.thenBackoff(Backoff.fixed(backoffMillis)))
.responseTimeoutForEachAttempt(Duration.ofMillis(attemptMillis))
.maxTotalAttempts(Integer.MAX_VALUE)
.newDecorator())
.build();

final long prev = System.nanoTime();
final Throwable throwable = catchThrowable(
() -> webClient.get("/", QueryParams.of("delayMillis", delayMillis)).aggregate().join());
assertThat(throwable)
.isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(ResponseTimeoutException.class);
logger.debug("elapsed time is: {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev));

if (backoffMillis > 0) {
assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev))
.isLessThan(TimeUnit.SECONDS.toMillis(timeoutSeconds));
} else {

assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev))
.isCloseTo(TimeUnit.SECONDS.toMillis(timeoutSeconds), Percentage.withPercentage(10));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void start(Listener<O> responseListener, Metadata metadata) {
ctx.setResponseTimeout(TimeoutMode.SET_FROM_NOW, Duration.ofNanos(remainingNanos));
}
} else {
remainingNanos = MILLISECONDS.toNanos(ctx.responseTimeoutMillis());
remainingNanos = ctx.remainingTimeoutNanos();
}

// Must come after handling deadline.
Expand Down

0 comments on commit 244e5cb

Please sign in to comment.