Skip to content

Commit

Permalink
[FLINK-36031] [kubernetes-client] Migrate to Fabric8 interceptor API
Browse files Browse the repository at this point in the history
  • Loading branch information
SamBarker authored Aug 22, 2024
1 parent 84c1934 commit 39e10d0
Show file tree
Hide file tree
Showing 8 changed files with 853 additions and 60 deletions.
1 change: 0 additions & 1 deletion examples/flink-sql-runner-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add connector dependencies here. They must be in the default scope (compile). -->

<!-- Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,29 @@

package org.apache.flink.kubernetes.operator.metrics;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils.SynchronizedMeterView;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;

import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.BasicBuilder;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongSupplier;

/** Kubernetes client metrics. */
public class KubernetesClientMetrics implements Interceptor {
Expand All @@ -50,6 +57,7 @@ public class KubernetesClientMetrics implements Interceptor {
public static final String COUNTER = "Count";
public static final String METER = "NumPerSecond";
public static final String HISTO = "TimeNanos";
public static final String REQUEST_START_TIME_HEADER = "requestStartTimeNanos";
private final Histogram responseLatency;

private final MetricGroup requestMetricGroup;
Expand All @@ -69,9 +77,20 @@ public class KubernetesClientMetrics implements Interceptor {
private final Map<Integer, SynchronizedMeterView> responseCodeMeters =
new ConcurrentHashMap<>();
private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
private final LongSupplier nanoTimeSource;

private final Logger logger = LoggerFactory.getLogger(KubernetesClientMetrics.class);

public KubernetesClientMetrics(
MetricGroup parentGroup, FlinkOperatorConfiguration flinkOperatorConfiguration) {
this(parentGroup, flinkOperatorConfiguration, System::nanoTime);
}

public KubernetesClientMetrics(
MetricGroup parentGroup,
FlinkOperatorConfiguration flinkOperatorConfiguration,
LongSupplier nanoTimeSource) {
this.nanoTimeSource = nanoTimeSource;
MetricGroup metricGroup = parentGroup.addGroup(KUBE_CLIENT_GROUP);

this.requestMetricGroup = metricGroup.addGroup(HTTP_REQUEST_GROUP);
Expand Down Expand Up @@ -121,29 +140,86 @@ public KubernetesClientMetrics(
}

@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
public void before(BasicBuilder builder, HttpRequest request, RequestTags tags) {
long requestStartTime = nanoTimeSource.getAsLong();
// Attach a header to the request. We don't care if is actually sent or echoed back in the
// response.
// As the request is included in the after callbacks so we just read the value from the
// headers on that.
builder.setHeader(REQUEST_START_TIME_HEADER, String.valueOf(requestStartTime));
updateRequestMetrics(request);
Response response = null;
final long startTime = System.nanoTime();
try {
response = chain.proceed(request);
return response;
} finally {
updateResponseMetrics(response, startTime);
}
}

private void updateRequestMetrics(Request request) {
@Override
public void after(
HttpRequest request,
HttpResponse<?> response,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
trackRequestLatency(request);
updateResponseMetrics(response);
}

@Override
public CompletableFuture<Boolean> afterFailure(
BasicBuilder builder, HttpResponse<?> response, RequestTags tags) {
this.requestFailedRateMeter.markEvent();
return CompletableFuture.completedFuture(false);
}

@Override
public void afterConnectionFailure(HttpRequest request, Throwable failure) {
trackRequestLatency(request);
this.requestFailedRateMeter.markEvent();
}

@VisibleForTesting
Counter getRequestCounter() {
return requestCounter;
}

@VisibleForTesting
Counter getResponseCounter() {
return responseCounter;
}

@VisibleForTesting
Counter getRequestMethodCounter(String method) {
return requestMethodCounter.get(method);
}

@VisibleForTesting
SynchronizedMeterView getRequestRateMeter() {
return requestRateMeter;
}

@VisibleForTesting
SynchronizedMeterView getResponseCodeMeter(int statusCode) {
return responseCodeMeters.get(statusCode);
}

@VisibleForTesting
List<SynchronizedMeterView> getResponseCodeGroupMeters() {
return responseCodeGroupMeters;
}

@VisibleForTesting
Histogram getResponseLatency() {
return responseLatency;
}

@VisibleForTesting
SynchronizedMeterView getRequestFailedRateMeter() {
return requestFailedRateMeter;
}

private void updateRequestMetrics(HttpRequest request) {
this.requestRateMeter.markEvent();
getCounterByRequestMethod(request.method()).inc();
}

private void updateResponseMetrics(Response response, long startTimeNanos) {
final long latency = System.nanoTime() - startTimeNanos;
private void updateResponseMetrics(HttpResponse<?> response) {
if (response != null) {
this.responseRateMeter.markEvent();
this.responseLatency.update(latency);
getMeterViewByResponseCode(response.code()).markEvent();
if (this.httpResponseCodeGroupsEnabled) {
responseCodeGroupMeters.get(response.code() / 100 - 1).markEvent();
Expand All @@ -153,6 +229,16 @@ private void updateResponseMetrics(Response response, long startTimeNanos) {
}
}

private void trackRequestLatency(HttpRequest request) {
final String header = request.header(REQUEST_START_TIME_HEADER);
if (header != null) {
final long currentNanos = nanoTimeSource.getAsLong();
final long requestStartNanos = Long.parseLong(header);
final long latency = currentNanos - requestStartNanos;
this.responseLatency.update(latency);
}
}

private Counter getCounterByRequestMethod(String method) {
return requestMethodCounter.computeIfAbsent(
method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +35,8 @@ public class KubernetesClientUtils {

private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientUtils.class);

private static final String METRICS_INTERCEPTOR_NAME = "KubernetesClientMetrics";

public static KubernetesClient getKubernetesClient(
FlinkOperatorConfiguration operatorConfig, MetricGroup metricGroup) {
return getKubernetesClient(operatorConfig, metricGroup, null);
Expand All @@ -51,19 +51,11 @@ public static KubernetesClient getKubernetesClient(
var clientBuilder = new KubernetesClientBuilder().withConfig(kubernetesClientConfig);

if (operatorConfig.isKubernetesClientMetricsEnabled()) {
clientBuilder =
clientBuilder.withHttpClientFactory(
// This logic should be replaced with a more generic solution once the
// fabric8 Interceptor class is improved to the point where this can be
// implemented.
new OkHttpClientFactory() {
@Override
protected void additionalConfig(OkHttpClient.Builder builder) {
builder.addInterceptor(
new KubernetesClientMetrics(
metricGroup, operatorConfig));
}
});
clientBuilder.withHttpClientBuilderConsumer(
httpCLientBuilder ->
httpCLientBuilder.addOrReplaceInterceptor(
METRICS_INTERCEPTOR_NAME,
new KubernetesClientMetrics(metricGroup, operatorConfig)));
}

return clientBuilder.build();
Expand Down
50 changes: 25 additions & 25 deletions flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ This project bundles the following dependencies under the Apache Software Licens
- commons-cli:commons-cli:1.5.0
- commons-collections:commons-collections:3.2.2
- commons-io:commons-io:2.11.0
- io.fabric8:kubernetes-client-api:jar:6.11.0
- io.fabric8:kubernetes-client:jar:6.11.0
- io.fabric8:kubernetes-httpclient-okhttp:jar:6.11.0
- io.fabric8:kubernetes-model-admissionregistration:jar:6.11.0
- io.fabric8:kubernetes-model-apiextensions:jar:6.11.0
- io.fabric8:kubernetes-model-apps:jar:6.11.0
- io.fabric8:kubernetes-model-autoscaling:jar:6.11.0
- io.fabric8:kubernetes-model-batch:jar:6.11.0
- io.fabric8:kubernetes-model-certificates:jar:6.11.0
- io.fabric8:kubernetes-model-common:jar:6.11.0
- io.fabric8:kubernetes-model-coordination:jar:6.11.0
- io.fabric8:kubernetes-model-core:jar:6.11.0
- io.fabric8:kubernetes-model-discovery:jar:6.11.0
- io.fabric8:kubernetes-model-events:jar:6.11.0
- io.fabric8:kubernetes-model-extensions:jar:6.11.0
- io.fabric8:kubernetes-model-flowcontrol:jar:6.11.0
- io.fabric8:kubernetes-model-gatewayapi:jar:6.11.0
- io.fabric8:kubernetes-model-metrics:jar:6.11.0
- io.fabric8:kubernetes-model-networking:jar:6.11.0
- io.fabric8:kubernetes-model-node:jar:6.11.0
- io.fabric8:kubernetes-model-policy:jar:6.11.0
- io.fabric8:kubernetes-model-rbac:jar:6.11.0
- io.fabric8:kubernetes-model-resource:jar:6.11.0
- io.fabric8:kubernetes-model-scheduling:jar:6.11.0
- io.fabric8:kubernetes-model-storageclass:jar:6.11.0
- io.fabric8:kubernetes-client-api:jar:6.13.2
- io.fabric8:kubernetes-client:jar:6.13.2
- io.fabric8:kubernetes-httpclient-okhttp:jar:6.13.2
- io.fabric8:kubernetes-model-admissionregistration:jar:6.13.2
- io.fabric8:kubernetes-model-apiextensions:jar:6.13.2
- io.fabric8:kubernetes-model-apps:jar:6.13.2
- io.fabric8:kubernetes-model-autoscaling:jar:6.13.2
- io.fabric8:kubernetes-model-batch:jar:6.13.2
- io.fabric8:kubernetes-model-certificates:jar:6.13.2
- io.fabric8:kubernetes-model-common:jar:6.13.2
- io.fabric8:kubernetes-model-coordination:jar:6.13.2
- io.fabric8:kubernetes-model-core:jar:6.13.2
- io.fabric8:kubernetes-model-discovery:jar:6.13.2
- io.fabric8:kubernetes-model-events:jar:6.13.2
- io.fabric8:kubernetes-model-extensions:jar:6.13.2
- io.fabric8:kubernetes-model-flowcontrol:jar:6.13.2
- io.fabric8:kubernetes-model-gatewayapi:jar:6.13.2
- io.fabric8:kubernetes-model-metrics:jar:6.13.2
- io.fabric8:kubernetes-model-networking:jar:6.13.2
- io.fabric8:kubernetes-model-node:jar:6.13.2
- io.fabric8:kubernetes-model-policy:jar:6.13.2
- io.fabric8:kubernetes-model-rbac:jar:6.13.2
- io.fabric8:kubernetes-model-resource:jar:6.13.2
- io.fabric8:kubernetes-model-scheduling:jar:6.13.2
- io.fabric8:kubernetes-model-storageclass:jar:6.13.2
- io.fabric8:zjsonpatch:jar:0.3.0
- io.javaoperatorsdk:operator-framework-core:jar:4.8.3
- io.javaoperatorsdk:operator-framework:jar:4.8.3
Expand Down
Loading

0 comments on commit 39e10d0

Please sign in to comment.