diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClientFactory.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClientFactory.java index 10141be6f3c..4a44a3ed353 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClientFactory.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClientFactory.java @@ -17,6 +17,8 @@ package com.linecorp.armeria.client.kubernetes; import com.linecorp.armeria.client.WebClientBuilder; +import com.linecorp.armeria.client.websocket.WebSocketClient; +import com.linecorp.armeria.client.websocket.WebSocketClientBuilder; import io.fabric8.kubernetes.client.http.HttpClient; @@ -36,4 +38,11 @@ public HttpClient.Builder newBuilder() { protected void additionalConfig(WebClientBuilder builder) { // no default implementation } + + /** + * Subclasses may use this to apply additional configuration for {@link WebSocketClient}. + */ + protected void additionalWebSocketConfig(WebSocketClientBuilder builder) { + // no default implementation + } } diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaWebSocketClient.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaWebSocketClient.java index 656cc7cb9a3..11a60e4bd10 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaWebSocketClient.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaWebSocketClient.java @@ -29,6 +29,7 @@ import com.linecorp.armeria.client.RequestOptions; import com.linecorp.armeria.client.websocket.WebSocketClient; +import com.linecorp.armeria.client.websocket.WebSocketClientBuilder; import com.linecorp.armeria.client.websocket.WebSocketClientHandshakeException; import com.linecorp.armeria.client.websocket.WebSocketSession; import com.linecorp.armeria.common.HttpHeaderNames; @@ -69,10 +70,12 @@ private WebSocketClient webSocketClient() { if (webSocketClient0 != null) { return webSocketClient0; } - webSocketClient0 = WebSocketClient.builder() - .factory(armeriaHttpClientBuilder.clientFactory(true)) - .aggregateContinuation(true) - .build(); + final WebSocketClientBuilder webSocketClientBuilder = + WebSocketClient.builder() + .factory(armeriaHttpClientBuilder.clientFactory(true)) + .aggregateContinuation(true); + armeriaHttpClientBuilder.getClientFactory().additionalWebSocketConfig(webSocketClientBuilder); + webSocketClient0 = webSocketClientBuilder.build(); this.webSocketClient = webSocketClient0; return webSocketClient0; } finally { diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java index 3cd08c9c1ed..4554f3eb662 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java @@ -23,18 +23,26 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import org.jctools.maps.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.errorprone.annotations.concurrent.GuardedBy; + import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup; import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; +import com.linecorp.armeria.client.retry.Backoff; +import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.util.ShutdownHooks; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.NodeAddress; @@ -103,6 +111,12 @@ public final class KubernetesEndpointGroup extends DynamicEndpointGroup { private static final Logger logger = LoggerFactory.getLogger(KubernetesEndpointGroup.class); + /** + * The debounce millis for the update of the endpoints. + * A short delay would be enough because the initial events are delivered sequentially. + */ + private static final int DEBOUNCE_MILLIS = 100; + private static final KubernetesClient DEFAULT_CLIENT = new KubernetesClientBuilder().build(); static { @@ -190,6 +204,9 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { return builder(new KubernetesClientBuilder().withConfig(kubeConfig).build(), true); } + // TODO(ikhoon): Consider a dedicated executor for the blocking tasks if necessary. + private final ScheduledExecutorService worker = CommonPools.blockingTaskExecutor(); + private final KubernetesClient client; private final boolean autoClose; @Nullable @@ -199,19 +216,29 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { private final String portName; private final Predicate nodeAddressFilter; - private final Watch nodeWatch; - private final Watch serviceWatch; + @Nullable + private volatile Watch nodeWatch; + @Nullable + private volatile Watch serviceWatch; @Nullable private volatile Watch podWatch; private final Map podToNode = new NonBlockingHashMap<>(); private final Map nodeToIp = new NonBlockingHashMap<>(); @Nullable - private Service service; + private volatile Service service; + @Nullable + private volatile Integer nodePort; + + private final ReentrantShortLock lock = new ReentrantShortLock(); + @GuardedBy("lock") @Nullable - private Integer nodePort; + private ScheduledFuture scheduledFuture; private volatile boolean closed; + private volatile int numServiceFailures; + private volatile int numNodeFailures; + private volatile int numPodFailures; KubernetesEndpointGroup(KubernetesClient client, @Nullable String namespace, String serviceName, @Nullable String portName, Predicate nodeAddressFilter, @@ -224,14 +251,39 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { this.portName = portName; this.nodeAddressFilter = nodeAddressFilter; this.autoClose = autoClose; - nodeWatch = watchNodes(); - serviceWatch = watchService(); + watchJob(this::watchNode); + watchJob(this::watchService); + } + + private void watchService() { + final Watch oldServiceWatch = serviceWatch; + if (oldServiceWatch != null) { + oldServiceWatch.close(); + } + + if (closed) { + return; + } + final Watch newServiceWatch; + try { + newServiceWatch = doWatchService(); + } catch (Exception e) { + logger.warn("[{}/{}] Failed to start the service watcher.", namespace, serviceName, e); + return; + } + // Recheck the closed flag because the doWatchService() method may take a while. + if (closed) { + newServiceWatch.close(); + } else { + serviceWatch = newServiceWatch; + logger.info("[{}/{}] Service watcher is started.", namespace, serviceName); + } } /** * Watches the service. {@link Watcher} will retry automatically on failures by {@link KubernetesClient}. */ - private Watch watchService() { + private Watch doWatchService() { final Watcher watcher = new Watcher() { @Override public void eventReceived(Action action, Service service0) { @@ -239,6 +291,7 @@ public void eventReceived(Action action, Service service0) { return; } + numServiceFailures = 0; switch (action) { case ADDED: case MODIFIED: @@ -265,21 +318,10 @@ public void eventReceived(Action action, Service service0) { service = service0; nodePort = nodePort0; - Watch podWatch0 = podWatch; - if (podWatch0 != null) { - podWatch0.close(); - } - // Clear the podToNode map before starting a new pod watch. - podToNode.clear(); - podWatch0 = watchPod(service0.getSpec().getSelector()); - if (closed) { - podWatch0.close(); - } else { - podWatch = podWatch0; - } + watchJob(() -> watchPod()); break; case DELETED: - logger.warn("{} service is deleted. (namespace: {})", serviceName, namespace); + logger.warn("[{}/{}] service is deleted.", namespace, serviceName); // This situation should not occur in production. break; case ERROR: @@ -294,7 +336,16 @@ public void onClose(WatcherException cause) { if (closed) { return; } - logger.warn("{} service watcher is closed. (namespace: {})", namespace, serviceName, cause); + logger.warn("[{}/{}] Service watcher is closed.", namespace, serviceName, cause); + logger.info("[{}/{}] Reconnecting the service watcher...", namespace, serviceName); + + // Immediately retry on the first failure. + watchJob(() -> watchService(), numServiceFailures++); + } + + @Override + public void onClose() { + logger.info("[{}/{}] Service watcher is closed gracefully.", namespace, serviceName); } }; @@ -305,20 +356,53 @@ public void onClose(WatcherException cause) { } } - private Watch watchPod(Map selector) { + private void watchPod() { + final Watch oldPodWatch = podWatch; + if (oldPodWatch != null) { + oldPodWatch.close(); + } + + if (closed) { + return; + } + final Watch newPodwatch; + try { + newPodwatch = doWatchPod(); + } catch (Exception e) { + logger.warn("[{}/{}] Failed to start the pod watcher.", namespace, serviceName, e); + return; + } + // Recheck the closed flag because the doWatchPod() method may take a while. + if (closed) { + newPodwatch.close(); + } else { + podWatch = newPodwatch; + logger.info("[{}/{}] Pod watcher is started.", namespace, serviceName); + } + } + + private Watch doWatchPod() { + // Clear the podToNode map before starting a new pod watch. + podToNode.clear(); final Watcher watcher = new Watcher() { @Override public void eventReceived(Action action, Pod resource) { if (closed) { return; } + + numPodFailures = 0; if (action == Action.ERROR || action == Action.BOOKMARK) { return; } final String podName = resource.getMetadata().getName(); final String nodeName = resource.getSpec().getNodeName(); + logger.debug("[{}/{}] Pod event received. action: {}, pod: {}, node: {}", + namespace, serviceName, action, podName, nodeName); + if (podName == null || nodeName == null) { - logger.debug("Pod or node name is null. pod: {}, node: {}", podName, nodeName); + logger.debug("[{}/{}] Pod or node name is null. pod: {}, node: {}", + namespace, serviceName, podName, nodeName); return; } @@ -332,7 +416,7 @@ public void eventReceived(Action action, Pod resource) { break; default: } - maybeUpdateEndpoints(); + maybeUpdateEndpoints(false); } @Override @@ -341,10 +425,22 @@ public void onClose(WatcherException cause) { return; } - logger.warn("Pod watcher for {}/{} is closed.", namespace, serviceName, cause); + logger.warn("[{}/{}] Pod watcher is closed.", namespace, serviceName, cause); + logger.info("[{}/{}] Reconnecting the pod watcher...", namespace, serviceName); + + watchJob(() -> watchPod(), numPodFailures++); + } + + @Override + public void onClose() { + logger.info("[{}/{}] Pod watcher is closed gracefully.", namespace, serviceName); } }; + final Service service = this.service; + assert service != null; + final Map selector = service.getSpec().getSelector(); + // watch() method will block until the watch connection is established. if (namespace == null) { return client.pods().withLabels(selector).watch(watcher); } else { @@ -352,10 +448,30 @@ public void onClose(WatcherException cause) { } } + private void watchNode() { + final Watch oldNodeWatch = nodeWatch; + if (oldNodeWatch != null) { + oldNodeWatch.close(); + } + + if (closed) { + return; + } + final Watch newNodeWatch = doWatchNode(); + // Recheck the closed flag because the doWatchNode() method may take a while. + if (closed) { + newNodeWatch.close(); + } else { + nodeWatch = newNodeWatch; + logger.info("[{}/{}] Node watcher is started.", namespace, serviceName); + } + } + /** * Fetches the internal IPs of the node. */ - private Watch watchNodes() { + private Watch doWatchNode() { + nodeToIp.clear(); final Watcher watcher = new Watcher() { @Override public void eventReceived(Action action, Node node) { @@ -363,11 +479,14 @@ public void eventReceived(Action action, Node node) { return; } + numNodeFailures = 0; if (action == Action.ERROR || action == Action.BOOKMARK) { return; } final String nodeName = node.getMetadata().getName(); + logger.debug("[{}/{}] Node event received. action: {}, node: {}", + namespace, serviceName, action, nodeName); switch (action) { case ADDED: case MODIFIED: @@ -376,7 +495,8 @@ public void eventReceived(Action action, Node node) { .map(NodeAddress::getAddress) .findFirst().orElse(null); if (nodeIp == null) { - logger.debug("No matching IP address is found in {}. node: {}", nodeName, node); + logger.debug("[{}/{}] No matching IP address is found in {}. node: {}", + namespace, serviceName, nodeName, node); nodeToIp.remove(nodeName); return; } @@ -386,9 +506,7 @@ public void eventReceived(Action action, Node node) { nodeToIp.remove(nodeName); break; } - // TODO(ikhoon): Reschedule the update after a certain delay since multiple websocket events - // are updated in a same task. - maybeUpdateEndpoints(); + maybeUpdateEndpoints(false); } @Override @@ -396,14 +514,44 @@ public void onClose(WatcherException cause) { if (closed) { return; } - logger.warn("Node watcher for {}/{} is closed.", namespace, serviceName, cause); + logger.warn("[{}/{}] Node watcher is closed.", namespace, serviceName, cause); + logger.info("[{}/{}] Reconnecting the node watcher...", namespace, serviceName); + watchJob(() -> watchNode(), numNodeFailures++); + } + + @Override + public void onClose() { + logger.info("[{}/{}] Node watcher is closed gracefully.", namespace, serviceName); } }; return client.nodes().watch(watcher); } - private void maybeUpdateEndpoints() { + private void watchJob(Runnable job) { + watchJob(job, 0); + } + + private void watchJob(Runnable job, int numAttempts) { + final Runnable safeRunnable = () -> { + try { + job.run(); + } catch (Exception e) { + logger.warn("[{}/{}] Failed to run a watch job.", namespace, serviceName, e); + } + }; + if (numAttempts == 0) { + worker.execute(safeRunnable); + } else { + worker.schedule(safeRunnable, Backoff.ofDefault().nextDelayMillis(numAttempts), + TimeUnit.MILLISECONDS); + } + } + + private void maybeUpdateEndpoints(boolean scheduledJob) { + if (closed) { + return; + } if (service == null) { // No event received for the service yet. return; @@ -419,6 +567,24 @@ private void maybeUpdateEndpoints() { return; } + lock.lock(); + try { + if (scheduledJob) { + scheduledFuture = null; + } else { + if (scheduledFuture != null) { + // A scheduled job is already scheduled. + return; + } + // Schedule a job to debounce the update of the endpoints. + scheduledFuture = worker.schedule(() -> maybeUpdateEndpoints(true), + DEBOUNCE_MILLIS, TimeUnit.MILLISECONDS); + return; + } + } finally { + lock.unlock(); + } + assert nodePort != null; final List endpoints = podToNode.values().stream() @@ -437,8 +603,14 @@ private void maybeUpdateEndpoints() { @Override protected void doCloseAsync(CompletableFuture future) { closed = true; - serviceWatch.close(); - nodeWatch.close(); + final Watch serviceWatch = this.serviceWatch; + if (serviceWatch != null) { + serviceWatch.close(); + } + final Watch nodeWatch = this.nodeWatch; + if (nodeWatch != null) { + nodeWatch.close(); + } final Watch podWatch = this.podWatch; if (podWatch != null) { podWatch.close(); diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupFaultToleranceTest.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupFaultToleranceTest.java new file mode 100644 index 00000000000..dcc326b6d3d --- /dev/null +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupFaultToleranceTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupMockServerTest.newDeployment; +import static com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupMockServerTest.newNode; +import static com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupMockServerTest.newPod; +import static com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupMockServerTest.newService; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.client.Endpoint; + +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; + +@EnableKubernetesMockClient( + crud = true, + kubernetesClientBuilderCustomizer = TestKubernetesClientBuilderCustomizer.class) +class KubernetesEndpointGroupFaultToleranceTest { + + private KubernetesClient client; + + @Test + void shouldReconnectOnWatcherException() throws InterruptedException { + // Prepare Kubernetes resources + final List nodes = ImmutableList.of(newNode("1.1.1.1"), newNode("2.2.2.2"), newNode("3.3.3.3")); + final Deployment deployment = newDeployment(); + final int nodePort = 30000; + final Service service = newService(nodePort); + final List pods = nodes.stream() + .map(node -> node.getMetadata().getName()) + .map(nodeName -> newPod(deployment.getSpec().getTemplate(), nodeName)) + .collect(toImmutableList()); + + // Create Kubernetes resources + for (Node node : nodes) { + client.nodes().resource(node).create(); + } + client.pods().resource(pods.get(0)).create(); + client.pods().resource(pods.get(1)).create(); + client.apps().deployments().resource(deployment).create(); + client.services().resource(service).create(); + + final KubernetesEndpointGroup endpointGroup = KubernetesEndpointGroup.of(client, "test", + "nginx-service"); + endpointGroup.whenReady().join(); + + // Initial state + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + // Wait until all endpoints are ready + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", nodePort), + Endpoint.of("2.2.2.2", nodePort) + ); + }); + + TestKubernetesClientBuilderCustomizer.injectFault(true); + // Add a new pod + client.pods().resource(pods.get(2)).create(); + + Thread.sleep(2000); + + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", nodePort), + Endpoint.of("2.2.2.2", nodePort) + ); + + TestKubernetesClientBuilderCustomizer.injectFault(false); + // Make sure the new pod is added when the fault is recovered. + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", nodePort), + Endpoint.of("2.2.2.2", nodePort), + Endpoint.of("3.3.3.3", nodePort) + ); + }); + } +} diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java index 4ea74410dfe..9cc6b7ac61c 100644 --- a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java @@ -22,7 +22,6 @@ import java.util.List; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; @@ -66,17 +65,8 @@ @EnableKubernetesMockClient(crud = true) class KubernetesEndpointGroupMockServerTest { - private static KubernetesClient staticClient; - private KubernetesClient client; - @AfterAll - static void afterAll() { - // A workaround for the issue that the static client is leaked. - // Remove once https://github.com/fabric8io/kubernetes-client/pull/5854 is released. - staticClient.close(); - } - @Test void createEndpointsWithNodeIpAndPort() throws InterruptedException { // Prepare Kubernetes resources @@ -344,7 +334,7 @@ private static Node newNode(String ip, String type) { .build(); } - private static Node newNode(String ip) { + static Node newNode(String ip) { return newNode(ip, "InternalIP"); } @@ -413,7 +403,7 @@ private static PodTemplateSpec newPodTemplate(String selectorName) { .build(); } - private static Pod newPod(PodTemplateSpec template, String newNodeName) { + static Pod newPod(PodTemplateSpec template, String newNodeName) { final PodSpec spec = template.getSpec() .toBuilder() .withNodeName(newNodeName) diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/TestKubernetesClientBuilderCustomizer.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/TestKubernetesClientBuilderCustomizer.java new file mode 100644 index 00000000000..9afde6eb21a --- /dev/null +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/TestKubernetesClientBuilderCustomizer.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.kubernetes.ArmeriaHttpClientFactory; +import com.linecorp.armeria.client.websocket.WebSocketClientBuilder; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.websocket.WebSocketFrame; +import com.linecorp.armeria.internal.common.websocket.WebSocketFrameEncoder; + +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.server.mock.KubernetesClientBuilderCustomizer; + +public class TestKubernetesClientBuilderCustomizer extends KubernetesClientBuilderCustomizer { + + private static final Logger logger = LoggerFactory.getLogger(TestKubernetesClientBuilderCustomizer.class); + + private static volatile boolean shouldInjectFault; + + static void injectFault(boolean shouldInjectFault) { + TestKubernetesClientBuilderCustomizer.shouldInjectFault = shouldInjectFault; + } + + @Override + public void accept(KubernetesClientBuilder kubernetesClientBuilder) { + kubernetesClientBuilder.withHttpClientFactory(new ArmeriaHttpClientFactory() { + + @Override + protected void additionalWebSocketConfig(WebSocketClientBuilder builder) { + builder.decorator((delegate, ctx, req) -> { + // Do something with the request. + final HttpResponse response = delegate.execute(ctx, req); + return response.mapData(object -> { + final HttpData newData; + if (shouldInjectFault) { + object.close(); + final WebSocketFrameEncoder encoder = WebSocketFrameEncoder.of(false); + final WebSocketFrame frame = WebSocketFrame.ofText("invalid data"); + newData = HttpData.wrap(encoder.encode(ctx, frame)); + } else { + newData = object; + } + return newData; + }); + }); + } + }); + } +}