diff --git a/core/src/main/java/com/linecorp/armeria/client/AbstractHttpResponseDecoder.java b/core/src/main/java/com/linecorp/armeria/client/AbstractHttpResponseDecoder.java index e1ba730eb38..c477c008389 100644 --- a/core/src/main/java/com/linecorp/armeria/client/AbstractHttpResponseDecoder.java +++ b/core/src/main/java/com/linecorp/armeria/client/AbstractHttpResponseDecoder.java @@ -62,8 +62,7 @@ public HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requ int id, DecodedHttpResponse res, ClientRequestContext ctx, EventLoop eventLoop) { final HttpResponseWrapper newRes = - new HttpResponseWrapper(requestHandler, res, eventLoop, ctx, - ctx.responseTimeoutMillis(), ctx.maxResponseLength()); + new HttpResponseWrapper(requestHandler, res, eventLoop, ctx, ctx.maxResponseLength()); final HttpResponseWrapper oldRes = responses.put(id, newRes); keepAliveHandler().increaseNumRequests(); diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java b/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java index 27aafaabd9c..dfd3a91fb6a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java @@ -58,7 +58,6 @@ class HttpResponseWrapper implements StreamWriter { private final EventLoop eventLoop; private final ClientRequestContext ctx; private final long maxContentLength; - private final long responseTimeoutMillis; private boolean responseStarted; private long contentLengthHeaderValue = -1; @@ -66,15 +65,14 @@ class HttpResponseWrapper implements StreamWriter { private boolean done; private boolean closed; - HttpResponseWrapper(@Nullable AbstractHttpRequestHandler requestHandler, - DecodedHttpResponse delegate, EventLoop eventLoop, ClientRequestContext ctx, - long responseTimeoutMillis, long maxContentLength) { + HttpResponseWrapper(@Nullable AbstractHttpRequestHandler requestHandler, DecodedHttpResponse delegate, + EventLoop eventLoop, ClientRequestContext ctx, long maxContentLength) { + this.requestHandler = requestHandler; this.delegate = delegate; this.eventLoop = eventLoop; this.ctx = ctx; this.maxContentLength = maxContentLength; - this.responseTimeoutMillis = responseTimeoutMillis; } void handle100Continue(ResponseHeaders responseHeaders) { @@ -327,7 +325,6 @@ public String toString() { .add("eventLoop", eventLoop) .add("responseStarted", responseStarted) .add("maxContentLength", maxContentLength) - .add("responseTimeoutMillis", responseTimeoutMillis) .add("contentLengthHeaderValue", contentLengthHeaderValue) .add("delegate", delegate) .toString(); diff --git a/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java b/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java index f80cbe0c4c7..cf6d0171792 100644 --- a/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java +++ b/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java @@ -100,8 +100,7 @@ public HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requ int id, DecodedHttpResponse decodedHttpResponse, ClientRequestContext ctx, EventLoop eventLoop) { assert res == null; - res = new WebSocketHttp1ResponseWrapper(decodedHttpResponse, eventLoop, ctx, - ctx.responseTimeoutMillis(), ctx.maxResponseLength()); + res = new WebSocketHttp1ResponseWrapper(decodedHttpResponse, eventLoop, ctx, ctx.maxResponseLength()); return res; } diff --git a/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ResponseWrapper.java b/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ResponseWrapper.java index 6d920f7116f..47abd673080 100644 --- a/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ResponseWrapper.java +++ b/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ResponseWrapper.java @@ -26,9 +26,8 @@ final class WebSocketHttp1ResponseWrapper extends HttpResponseWrapper { WebSocketHttp1ResponseWrapper(DecodedHttpResponse delegate, - EventLoop eventLoop, ClientRequestContext ctx, - long responseTimeoutMillis, long maxContentLength) { - super(null, delegate, eventLoop, ctx, responseTimeoutMillis, maxContentLength); + EventLoop eventLoop, ClientRequestContext ctx, long maxContentLength) { + super(null, delegate, eventLoop, ctx, maxContentLength); WebSocketClientUtil.setClosingResponseTask(ctx, cause -> { super.close(cause, false); }); diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java b/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java index a818d4c8994..26a9081031e 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java @@ -73,4 +73,6 @@ public interface ClientRequestContextExtension extends ClientRequestContext, Req * with default values on every request. */ HttpHeaders internalRequestHeaders(); + + long remainingTimeoutNanos(); } diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java index 6af10aad221..4ad2e2bdeee 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java @@ -540,8 +540,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx, log.startRequest(); // Cancel the original timeout and create a new scheduler for the derived context. ctx.responseCancellationScheduler.cancelScheduled(); - responseCancellationScheduler = - CancellationScheduler.ofClient(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis())); + responseCancellationScheduler = CancellationScheduler.ofClient(ctx.remainingTimeoutNanos()); writeTimeoutMillis = ctx.writeTimeoutMillis(); maxResponseLength = ctx.maxResponseLength(); @@ -898,6 +897,11 @@ public HttpHeaders internalRequestHeaders() { return internalRequestHeaders; } + @Override + public long remainingTimeoutNanos() { + return responseCancellationScheduler().remainingTimeoutNanos(); + } + @Override public void setAdditionalRequestHeader(CharSequence name, Object value) { requireNonNull(name, "name"); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java index a5b91af0950..7f5ff402ee6 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java @@ -108,6 +108,12 @@ default void finishNow() { */ long timeoutNanos(); + /** + * Before the scheduler has started, the configured timeout will be returned regardless of the + * {@link TimeoutMode}. If the scheduler has already started, the remaining time will be returned. + */ + long remainingTimeoutNanos(); + long startTimeNanos(); CompletableFuture whenCancelling(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java index 698aa7e523f..f0e7bddd380 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java @@ -112,12 +112,14 @@ public void start() { if (state != State.INIT) { return; } - state = State.SCHEDULED; startTimeNanos = ticker.read(); if (timeoutMode == TimeoutMode.SET_FROM_NOW) { final long elapsedTimeNanos = startTimeNanos - setFromNowStartNanos; timeoutNanos = Long.max(LongMath.saturatedSubtract(timeoutNanos, elapsedTimeNanos), 0); } + + // set the state after all timeout related fields are updated + state = State.SCHEDULED; if (timeoutNanos != Long.MAX_VALUE) { scheduledFuture = eventLoop().schedule(() -> invokeTask(null), timeoutNanos, NANOSECONDS); } @@ -292,6 +294,23 @@ public long timeoutNanos() { return timeoutNanos == Long.MAX_VALUE ? 0 : timeoutNanos; } + @Override + public long remainingTimeoutNanos() { + lock.lock(); + try { + if (timeoutNanos == Long.MAX_VALUE) { + return 0; + } + if (!isStarted()) { + return timeoutNanos; + } + final long elapsed = ticker.read() - startTimeNanos; + return Math.max(1, LongMath.saturatedSubtract(timeoutNanos, elapsed)); + } finally { + lock.unlock(); + } + } + @Override public long startTimeNanos() { return startTimeNanos; diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java index c6f6ac71b83..4bd6e94ffc9 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java @@ -90,6 +90,11 @@ public long timeoutNanos() { return 0; } + @Override + public long remainingTimeoutNanos() { + return 0; + } + @Override public long startTimeNanos() { return 0; diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java new file mode 100644 index 00000000000..077f2f88106 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import java.time.Duration; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.client.ResponseTimeoutMode; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.QueryParams; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +class ResponseTimeoutFromStartTest { + + private static final Logger logger = LoggerFactory.getLogger(ResponseTimeoutFromStartTest.class); + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/", (ctx, req) -> { + final String delayMillisStr = ctx.queryParam("delayMillis"); + assertThat(delayMillisStr).isNotNull(); + final int delayMillis = Integer.parseInt(delayMillisStr); + return HttpResponse.delayed(HttpResponse.of(500), Duration.ofMillis(delayMillis)); + }); + } + }; + + @ParameterizedTest + @CsvSource({ + "0,2500,2000", + "0,1750,2000", + "5000,1500,2000", + }) + void originalResponseTimeoutRespected(long backoffMillis, long attemptMillis, long delayMillis) { + final long timeoutSeconds = 3; + final WebClient webClient = + WebClient.builder(server.httpUri()) + .responseTimeout(Duration.ofSeconds(timeoutSeconds)) + .responseTimeoutMode(ResponseTimeoutMode.FROM_START) + .decorator( + RetryingClient.builder(RetryRule.builder() + .onException() + .onServerErrorStatus() + .thenBackoff(Backoff.fixed(backoffMillis))) + .responseTimeoutForEachAttempt(Duration.ofMillis(attemptMillis)) + .maxTotalAttempts(Integer.MAX_VALUE) + .newDecorator()) + .build(); + + final long prev = System.nanoTime(); + final Throwable throwable = catchThrowable( + () -> webClient.get("/", QueryParams.of("delayMillis", delayMillis)).aggregate().join()); + assertThat(throwable) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(ResponseTimeoutException.class); + logger.debug("elapsed time is: {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev)); + + if (backoffMillis > 0) { + assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev)) + .isLessThan(TimeUnit.SECONDS.toMillis(timeoutSeconds)); + } else { + + assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev)) + .isCloseTo(TimeUnit.SECONDS.toMillis(timeoutSeconds), Percentage.withPercentage(10)); + } + } +} diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java index e996aa012b8..d00c4d572cc 100644 --- a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java +++ b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java @@ -241,7 +241,7 @@ public void start(Listener responseListener, Metadata metadata) { ctx.setResponseTimeout(TimeoutMode.SET_FROM_NOW, Duration.ofNanos(remainingNanos)); } } else { - remainingNanos = MILLISECONDS.toNanos(ctx.responseTimeoutMillis()); + remainingNanos = ctx.remainingTimeoutNanos(); } // Must come after handling deadline.