Skip to content

Commit

Permalink
fix fabric8io#4911: initial work towards consolidating timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Apr 19, 2023
1 parent 338d3b0 commit cb5aeba
Show file tree
Hide file tree
Showing 48 changed files with 272 additions and 344 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
* Fix #4875: Removed unused options from the java-generator
* Fix #4910: all Pod file uploads not require commons-compress
* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated
* Fix #4911: Config/RequestConfig.scaleTimeout has been deprectated. withTimeout may be called before the scale operation.
* Fix #4911: Config/RequestConfig.websocketTimeout has been removed. Config/RequestConfig.requestTimeout will be used for websocket connection timeouts.
* Fix #4911: HttpClient api/building changes - writeTimeout has been removed, readTimeout has moved to the HttpRequest

### 6.5.1 (2023-03-20)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
/**
* TODO:
* - Mapping to a Reader is always UTF-8
* - determine if write timeout should be implemented
*/
public class JdkHttpClientImpl extends StandardHttpClient<JdkHttpClientImpl, JdkHttpClientFactory, JdkHttpClientBuilderImpl> {

Expand Down Expand Up @@ -258,7 +257,7 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHtt
java.net.http.HttpRequest.Builder requestBuilder(StandardHttpRequest request) {
java.net.http.HttpRequest.Builder requestBuilder = java.net.http.HttpRequest.newBuilder();

Duration readTimeout = this.builder.getReadTimeout();
Duration readTimeout = request.getReadTimeout();
if (readTimeout != null && !java.time.Duration.ZERO.equals(readTimeout)) {
requestBuilder.timeout(readTimeout);
}
Expand Down Expand Up @@ -311,9 +310,7 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(
if (standardWebSocketBuilder.getSubprotocol() != null) {
newBuilder.subprotocols(standardWebSocketBuilder.getSubprotocol());
}
// the Watch logic sets a websocketTimeout as the readTimeout
// TODO: this should probably be made clearer in the docs
Duration readTimeout = this.builder.getReadTimeout();
Duration readTimeout = request.getReadTimeout();
if (readTimeout != null && !java.time.Duration.ZERO.equals(readTimeout)) {
newBuilder.connectTimeout(readTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ void testZeroTimeouts() {
JdkHttpClientBuilderImpl builder = factory.newBuilder();

// should build and be usable without an issue
try (HttpClient client = builder.readTimeout(0, TimeUnit.MILLISECONDS).connectTimeout(0, TimeUnit.MILLISECONDS)
.writeTimeout(0,
TimeUnit.MILLISECONDS)
.build();) {
try (HttpClient client = builder.connectTimeout(0, TimeUnit.MILLISECONDS).build();) {
assertNotNull(client.newHttpRequestBuilder().uri("http://localhost").build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.StandardHttpClient;
import io.fabric8.kubernetes.client.http.StandardHttpClientBuilder;
Expand Down Expand Up @@ -100,7 +99,9 @@ private Request newRequest(StandardHttpRequest originalRequest) {
final var request = requestBuilder.build();

var jettyRequest = jetty.newRequest(request.uri()).method(request.method());
jettyRequest.timeout(builder.getReadTimeout().toMillis() + builder.getWriteTimeout().toMillis(), TimeUnit.MILLISECONDS);
if (originalRequest.getReadTimeout() != null) {
jettyRequest.timeout(originalRequest.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
jettyRequest.headers(m -> request.headers().forEach((k, l) -> l.forEach(v -> m.add(k, v))));

final var contentType = Optional.ofNullable(request.getContentType());
Expand Down Expand Up @@ -136,14 +137,14 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(StandardWebSock
Listener listener) {
try {
jettyWs.start();
HttpRequest request = standardWebSocketBuilder.asHttpRequest();
StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest();
final ClientUpgradeRequest cur = new ClientUpgradeRequest();
if (Utils.isNotNullOrEmpty(standardWebSocketBuilder.getSubprotocol())) {
cur.setSubProtocols(standardWebSocketBuilder.getSubprotocol());
}
cur.setHeaders(request.headers());
if (builder.getReadTimeout() != null) {
cur.setTimeout(builder.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS);
if (request.getReadTimeout() != null) {
cur.setTimeout(request.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
// Extra-future required because we can't Map the UpgradeException to a WebSocketHandshakeException easily
final CompletableFuture<WebSocketResponse> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,6 @@ protected JettyHttpClientBuilder newInstance(JettyHttpClientFactory clientFactor
return new JettyHttpClientBuilder(clientFactory);
}

@Override
public Duration getReadTimeout() {
return Optional.ofNullable(readTimeout).orElse(Duration.ZERO);
}

@Override
public Duration getWriteTimeout() {
return Optional.ofNullable(writeTimeout).orElse(Duration.ZERO);
}

@Override
public Duration getConnectTimeout() {
return Optional.ofNullable(connectTimeout).orElse(Duration.ZERO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,12 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep
final var originalBuilder = new JettyHttpClientBuilder(null);
originalBuilder
.connectTimeout(1337, TimeUnit.SECONDS)
.readTimeout(1337, TimeUnit.SECONDS)
.tlsVersions(TlsVersion.SSL_3_0)
.followAllRedirects();
try (var firstClient = new JettyHttpClient(
originalBuilder, httpClient, webSocketClient)) {
// When
final var result = firstClient.newBuilder()
.readTimeout(313373, TimeUnit.SECONDS);
final var result = firstClient.newBuilder();
// Then
assertThat(result)
.isNotNull()
Expand All @@ -90,11 +88,11 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep
.isEqualTo(method.invoke(originalBuilder))
.isEqualTo(entry.getValue());
}
var readTimeout = StandardHttpClientBuilder.class.getDeclaredField("readTimeout");
readTimeout.setAccessible(true);
assertThat(readTimeout.get(result)).isEqualTo(Duration.ofSeconds(313373));
assertThat(readTimeout.get(originalBuilder)).isEqualTo(Duration.ofSeconds(1337));
readTimeout.setAccessible(false);
var connectTimeout = StandardHttpClientBuilder.class.getDeclaredField("connectTimeout");
connectTimeout.setAccessible(true);
assertThat(connectTimeout.get(result)).isEqualTo(Duration.ofSeconds(1337));
assertThat(connectTimeout.get(originalBuilder)).isEqualTo(Duration.ofSeconds(1337));
connectTimeout.setAccessible(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,16 @@ public OkHttpClientImpl initialBuild(okhttp3.OkHttpClient.Builder builder) {
}

private OkHttpClientImpl derivedBuild(okhttp3.OkHttpClient.Builder builder) {
if (readTimeout != null) {
builder.readTimeout(this.readTimeout);
}
if (writeTimeout != null) {
builder.writeTimeout(this.writeTimeout);
}
if (authenticatorNone) {
builder.authenticator(Authenticator.NONE);
}
if (forStreaming) {
builder.cache(null);
}
OkHttpClient client = builder.build();
if (this.forStreaming) {
// If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does
// not let us stream responses from the server.
for (Interceptor i : client.networkInterceptors()) {
if (i instanceof HttpLoggingInterceptor) {
HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i;
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
}
// If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does
// not let us stream responses from the server.
for (Interceptor i : client.networkInterceptors()) {
if (i instanceof HttpLoggingInterceptor) {
HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i;
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,24 @@ public void close() {
}
}

private CompletableFuture<HttpResponse<AsyncBody>> sendAsync(HttpRequest request,
private CompletableFuture<HttpResponse<AsyncBody>> sendAsync(StandardHttpRequest request,
Function<BufferedSource, AsyncBody> handler) {
CompletableFuture<HttpResponse<AsyncBody>> future = new CompletableFuture<>();
Call call = httpClient.newCall(requestBuilder((StandardHttpRequest) request).build());

okhttp3.OkHttpClient.Builder clientBuilder = null;
if (request.getReadTimeout() != null) {
clientBuilder = httpClient.newBuilder();
clientBuilder.readTimeout(request.getReadTimeout());
}
if (request.isForStreaming()) {
if (clientBuilder == null) {
clientBuilder = httpClient.newBuilder();
}
clientBuilder.cache(null);
}

Call call = Optional.ofNullable(clientBuilder).map(okhttp3.OkHttpClient.Builder::build).orElse(httpClient)
.newCall(requestBuilder(request).build());
try {
call.enqueue(new Callback() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(StandardWebSock
WebSocket.Listener listener) {
WebSocketConnectOptions options = new WebSocketConnectOptions();

if (builder.getReadTimeout() != null) {
options.setTimeout(builder.getReadTimeout().toMillis());
}

if (standardWebSocketBuilder.getSubprotocol() != null) {
options.setSubProtocols(Collections.singletonList(standardWebSocketBuilder.getSubprotocol()));
}

StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest();

if (request.getReadTimeout() != null) {
options.setTimeout(request.getReadTimeout().toMillis());
}

request.headers().entrySet().stream()
.forEach(e -> e.getValue().stream().forEach(v -> options.addHeader(e.getKey(), v)));
options.setAbsoluteURI(request.uri().toString());
Expand Down Expand Up @@ -140,6 +140,10 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHtt
options.setAbsoluteURI(request.uri().toString());
options.setMethod(HttpMethod.valueOf(request.method()));

if (request.getReadTimeout() != null) {
options.setTimeout(request.getReadTimeout().toMillis());
}

// Proxy authorization is handled manually since the proxyAuthorization value is the actual header
if (proxyAuthorization != null) {
options.putHeader(HttpHeaders.PROXY_AUTHORIZATION, proxyAuthorization);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ public VertxHttpClient<F> build() {
options.setConnectTimeout((int) this.connectTimeout.toMillis());
}

if (this.writeTimeout != null) {
options.setWriteIdleTimeout((int) this.writeTimeout.getSeconds());
}

if (this.followRedirects) {
options.setFollowRedirects(followRedirects);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ void testZeroTimeouts() {
HttpClient.Builder builder = factory.newBuilder();

// should build and be usable without an issue
try (HttpClient client = builder.readTimeout(0, TimeUnit.MILLISECONDS).connectTimeout(0, TimeUnit.MILLISECONDS)
.writeTimeout(0,
TimeUnit.MILLISECONDS)
.build();) {
try (HttpClient client = builder.connectTimeout(0, TimeUnit.MILLISECONDS).build();) {
assertNotNull(client.newHttpRequestBuilder().uri("http://localhost").build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class Config {
public static final String KUBERNETES_REQUEST_RETRY_BACKOFFINTERVAL_SYSTEM_PROPERTY = "kubernetes.request.retry.backoffInterval";
public static final String KUBERNETES_LOGGING_INTERVAL_SYSTEM_PROPERTY = "kubernetes.logging.interval";
public static final String KUBERNETES_SCALE_TIMEOUT_SYSTEM_PROPERTY = "kubernetes.scale.timeout";
public static final String KUBERNETES_WEBSOCKET_TIMEOUT_SYSTEM_PROPERTY = "kubernetes.websocket.timeout";
public static final String KUBERNETES_WEBSOCKET_PING_INTERVAL_SYSTEM_PROPERTY = "kubernetes.websocket.ping.interval";
public static final String KUBERNETES_MAX_CONCURRENT_REQUESTS = "kubernetes.max.concurrent.requests";
public static final String KUBERNETES_MAX_CONCURRENT_REQUESTS_PER_HOST = "kubernetes.max.concurrent.requests.per.host";
Expand Down Expand Up @@ -136,7 +135,6 @@ public class Config {
public static final Long DEFAULT_SCALE_TIMEOUT = 10 * 60 * 1000L;
public static final int DEFAULT_REQUEST_TIMEOUT = 10 * 1000;
public static final int DEFAULT_LOGGING_INTERVAL = 20 * 1000;
public static final Long DEFAULT_WEBSOCKET_TIMEOUT = 5 * 1000L;
public static final Long DEFAULT_WEBSOCKET_PING_INTERVAL = 30 * 1000L;

public static final Integer DEFAULT_MAX_CONCURRENT_REQUESTS = 64;
Expand Down Expand Up @@ -197,7 +195,6 @@ public class Config {
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private long scaleTimeout = DEFAULT_SCALE_TIMEOUT;
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;
private String impersonateUsername;

/**
Expand Down Expand Up @@ -321,14 +318,14 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
String oauthToken, int watchReconnectInterval, int watchReconnectLimit, int connectionTimeout, int requestTimeout,
long rollingTimeout, long scaleTimeout, int loggingInterval, int maxConcurrentRequests, int maxConcurrentRequestsPerHost,
String httpProxy, String httpsProxy, String[] noProxy, Map<Integer, String> errorMessages, String userAgent,
TlsVersion[] tlsVersions, long websocketTimeout, long websocketPingInterval, String proxyUsername, String proxyPassword,
TlsVersion[] tlsVersions, long websocketPingInterval, String proxyUsername, String proxyPassword,
String trustStoreFile, String trustStorePassphrase, String keyStoreFile, String keyStorePassphrase,
String impersonateUsername, String[] impersonateGroups, Map<String, List<String>> impersonateExtras) {
this(masterUrl, apiVersion, namespace, trustCerts, disableHostnameVerification, caCertFile, caCertData, clientCertFile,
clientCertData, clientKeyFile, clientKeyData, clientKeyAlgo, clientKeyPassphrase, username, password, oauthToken,
watchReconnectInterval, watchReconnectLimit, connectionTimeout, requestTimeout, scaleTimeout,
loggingInterval, maxConcurrentRequests, maxConcurrentRequestsPerHost, false, httpProxy, httpsProxy, noProxy,
errorMessages, userAgent, tlsVersions, websocketTimeout, websocketPingInterval, proxyUsername, proxyPassword,
errorMessages, userAgent, tlsVersions, websocketPingInterval, proxyUsername, proxyPassword,
trustStoreFile, trustStorePassphrase, keyStoreFile, keyStorePassphrase, impersonateUsername, impersonateGroups,
impersonateExtras, null, null, DEFAULT_REQUEST_RETRY_BACKOFFLIMIT, DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL,
DEFAULT_UPLOAD_REQUEST_TIMEOUT);
Expand All @@ -341,7 +338,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
String oauthToken, int watchReconnectInterval, int watchReconnectLimit, int connectionTimeout, int requestTimeout,
long scaleTimeout, int loggingInterval, int maxConcurrentRequests, int maxConcurrentRequestsPerHost,
boolean http2Disable, String httpProxy, String httpsProxy, String[] noProxy, Map<Integer, String> errorMessages,
String userAgent, TlsVersion[] tlsVersions, long websocketTimeout, long websocketPingInterval, String proxyUsername,
String userAgent, TlsVersion[] tlsVersions, long websocketPingInterval, String proxyUsername,
String proxyPassword, String trustStoreFile, String trustStorePassphrase, String keyStoreFile, String keyStorePassphrase,
String impersonateUsername, String[] impersonateGroups, Map<String, List<String>> impersonateExtras,
OAuthTokenProvider oauthTokenProvider, Map<String, String> customHeaders, int requestRetryBackoffLimit,
Expand All @@ -365,7 +362,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
this.connectionTimeout = connectionTimeout;

this.requestConfig = new RequestConfig(watchReconnectLimit, watchReconnectInterval,
requestTimeout, scaleTimeout, loggingInterval, websocketTimeout,
requestTimeout, scaleTimeout, loggingInterval,
requestRetryBackoffLimit, requestRetryBackoffInterval, uploadRequestTimeout);
this.requestConfig.setImpersonateUsername(impersonateUsername);
this.requestConfig.setImpersonateGroups(impersonateGroups);
Expand Down Expand Up @@ -473,12 +470,6 @@ public static void configFromSysPropsOrEnvVars(Config config) {
config.setRequestRetryBackoffInterval(Utils.getSystemPropertyOrEnvVar(
KUBERNETES_REQUEST_RETRY_BACKOFFINTERVAL_SYSTEM_PROPERTY, config.getRequestRetryBackoffInterval()));

String configuredWebsocketTimeout = Utils.getSystemPropertyOrEnvVar(KUBERNETES_WEBSOCKET_TIMEOUT_SYSTEM_PROPERTY,
String.valueOf(config.getWebsocketTimeout()));
if (configuredWebsocketTimeout != null) {
config.setWebsocketTimeout(Long.parseLong(configuredWebsocketTimeout));
}

String configuredWebsocketPingInterval = Utils.getSystemPropertyOrEnvVar(KUBERNETES_WEBSOCKET_PING_INTERVAL_SYSTEM_PROPERTY,
String.valueOf(config.getWebsocketPingInterval()));
if (configuredWebsocketPingInterval != null) {
Expand Down Expand Up @@ -1304,15 +1295,6 @@ public void setTlsVersions(TlsVersion[] tlsVersions) {
this.tlsVersions = tlsVersions;
}

@JsonProperty("websocketTimeout")
public long getWebsocketTimeout() {
return getRequestConfig().getWebsocketTimeout();
}

public void setWebsocketTimeout(long websocketTimeout) {
this.requestConfig.setWebsocketTimeout(websocketTimeout);
}

@JsonProperty("websocketPingInterval")
public long getWebsocketPingInterval() {
return websocketPingInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_TIMEOUT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_SCALE_TIMEOUT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_UPLOAD_REQUEST_TIMEOUT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_WEBSOCKET_TIMEOUT;

public class RequestConfig {

Expand All @@ -46,20 +45,18 @@ public class RequestConfig {
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private long scaleTimeout = DEFAULT_SCALE_TIMEOUT;
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;

RequestConfig() {
}

@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder", editableEnabled = false)
public RequestConfig(int watchReconnectLimit, int watchReconnectInterval, int requestTimeout,
long scaleTimeout, int loggingInterval, long websocketTimeout, int requestRetryBackoffLimit,
long scaleTimeout, int loggingInterval, int requestRetryBackoffLimit,
int requestRetryBackoffInterval, int uploadRequestTimeout) {
this.watchReconnectLimit = watchReconnectLimit;
this.watchReconnectInterval = watchReconnectInterval;
this.requestTimeout = requestTimeout;
this.scaleTimeout = scaleTimeout;
this.websocketTimeout = websocketTimeout;
this.loggingInterval = loggingInterval;
this.requestRetryBackoffLimit = requestRetryBackoffLimit;
this.requestRetryBackoffInterval = requestRetryBackoffInterval;
Expand Down Expand Up @@ -130,14 +127,6 @@ public void setLoggingInterval(int loggingInterval) {
this.loggingInterval = loggingInterval;
}

public long getWebsocketTimeout() {
return websocketTimeout;
}

public void setWebsocketTimeout(long websocketTimeout) {
this.websocketTimeout = websocketTimeout;
}

public void setImpersonateUsername(String impersonateUsername) {
this.impersonateUsername = impersonateUsername;
}
Expand Down
Loading

0 comments on commit cb5aeba

Please sign in to comment.