From e1773e72bef71c96c1372c481f3ef49d9c433619 Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Mon, 18 Dec 2023 15:48:11 +0100 Subject: [PATCH 01/24] exit quarkus on graceful watch onClose the same way as we do when the watch closes exceptionally. The graceful onClose is default-implemented with a debug log message so we had mistakenly not overridden it. --- .../kafka/keyvalue/kubernetes/EndpointsWatcher.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index f36a593..71b738d 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -119,10 +119,16 @@ public void eventReceived(Action action, Endpoints resource) { @Override public void onClose(WatcherException cause) { // REVIEW what is a reasonable strategy here? - logger.warn("Exiting application due to watch closed"); + logger.warn("Exiting application due to watch exceptionally closed"); logger.error(cause.getMessage()); Quarkus.asyncExit(11); } + + @Override + public void onClose() { + logger.warn("Exiting application due to graceful watch closed"); + Quarkus.asyncExit(11); + } }); } From efb546dbebf75001d6b51e912745a2720158c16e Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Mon, 18 Dec 2023 15:49:43 +0100 Subject: [PATCH 02/24] log debug level for kubernetes client We had no sign of graceful watcher onClose since it only logged a debug message in its default implementation --- src/main/resources/application.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 798603b..fdfd796 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -50,6 +50,8 @@ quarkus: level: DEBUG "org.apache.kafka.clients.Metadata": level: DEBUG + "io.fabric8.kubernetes.client": + level: DEBUG kafka: snappy: From fa6388d79b9c69848cd12e602412b0ac89fe2f22 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Mon, 18 Dec 2023 16:46:26 +0100 Subject: [PATCH 03/24] Quarkus 3.6.0->3.6.3 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7859167..6e92e9b 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ UTF-8 quarkus-bom io.quarkus.platform - 3.6.0 + 3.6.3 true 3.1.2 devservices From 8c7bf0a37b8180b40e6c07bca36dcc2dd8d7f071 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Mon, 18 Dec 2023 16:54:06 +0100 Subject: [PATCH 04/24] io.fabric8.kubernetes.client.vertx only logs errors --- src/main/resources/application.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index fdfd796..f33491f 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -52,6 +52,8 @@ quarkus: level: DEBUG "io.fabric8.kubernetes.client": level: DEBUG + "io.vertx.core.http": + level: DEBUG kafka: snappy: From 06cffc250ec4c590370f2471b3a50a21377c9b0a Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Mon, 18 Dec 2023 17:18:10 +0100 Subject: [PATCH 05/24] Counts both success and failure so we can get an error rate --- .../webclient/UpdatesDispatcherWebclient.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java index e29787a..3cc0afb 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.smallrye.mutiny.Uni; import io.vertx.core.buffer.Buffer; @@ -28,21 +29,29 @@ public class UpdatesDispatcherWebclient implements UpdatesDispatcher { EndpointsWatcher watcher; - private MeterRegistry registry; - @Inject UpdatesDispatcherWebclientConfig config; private final WebClient webClient; - static void initMetrics(MeterRegistry registry) { - registry.counter("kkv.target.update.failure").increment(0); + private Counter countFailures; + + private Counter countSuccess; + + void initMetrics(MeterRegistry registry) { + countFailures = Counter.builder("kkv.target.update.failure") + .description("Failures to confirm update of a target endpoint, after retries") + .register(registry); + countSuccess = Counter.builder("kkv.target.update.success") + .description("Confirm updates of a target endpoint, after retries") + .register(registry); + countFailures.increment(0); + countSuccess.increment(0); } @Inject public UpdatesDispatcherWebclient(Vertx vertx, MeterRegistry registry, EndpointsWatcher watcher) { this.webClient = WebClient.create(vertx); - this.registry = registry; this.watcher = watcher; initMetrics(registry); @@ -76,6 +85,7 @@ private void dispatch(UpdatesBodyPerTopic body, Map targets) { String name = entry.getValue(); dispatch(json, headers, ip, config.targetServicePort()).subscribe().with(item -> { + countSuccess.increment(); logger.info("Successfully sent update to {}", name); }, getDispatchFailureConsumer(name)); }); @@ -91,7 +101,7 @@ private Uni dispatch(JsonObject json, Map headers, String hos private Consumer getDispatchFailureConsumer(String name) { return (t) -> { - registry.counter("kkv.target.update.failure").increment(); + countFailures.increment(); logger.error("Failed to send update to " + name, t); }; } From 440a8cd4f871e2842bf8ef08b8f7c15ebfbbffb3 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Mon, 18 Dec 2023 17:19:07 +0100 Subject: [PATCH 06/24] Uses the same prefix as our other onupdate related metrics --- .../onupdate/webclient/UpdatesDispatcherWebclient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java index 3cc0afb..1c76913 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java @@ -39,10 +39,10 @@ public class UpdatesDispatcherWebclient implements UpdatesDispatcher { private Counter countSuccess; void initMetrics(MeterRegistry registry) { - countFailures = Counter.builder("kkv.target.update.failure") + countFailures = Counter.builder("kkv.onupdate.failed") .description("Failures to confirm update of a target endpoint, after retries") .register(registry); - countSuccess = Counter.builder("kkv.target.update.success") + countSuccess = Counter.builder("kkv.onupdate.ok") .description("Confirm updates of a target endpoint, after retries") .register(registry); countFailures.increment(0); From 7389f628b1db819d5908b109edbb515d2b0096aa Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Mon, 18 Dec 2023 17:26:53 +0100 Subject: [PATCH 07/24] Changed my mind: keep the prefix, means actual watch target update while "onupdate" refers to the mechanism of handling key-value updates This reverts commit 440a8cd4f871e2842bf8ef08b8f7c15ebfbbffb3. --- .../onupdate/webclient/UpdatesDispatcherWebclient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java index 1c76913..92ec98d 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclient.java @@ -39,10 +39,10 @@ public class UpdatesDispatcherWebclient implements UpdatesDispatcher { private Counter countSuccess; void initMetrics(MeterRegistry registry) { - countFailures = Counter.builder("kkv.onupdate.failed") + countFailures = Counter.builder("kkv.target.update.failure") .description("Failures to confirm update of a target endpoint, after retries") .register(registry); - countSuccess = Counter.builder("kkv.onupdate.ok") + countSuccess = Counter.builder("kkv.target.update.ok") .description("Confirm updates of a target endpoint, after retries") .register(registry); countFailures.increment(0); From cca2aed71626763c966e02e5625995c9bcf40649 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Mon, 18 Dec 2023 17:35:21 +0100 Subject: [PATCH 08/24] Fixes test with non-static init --- .../onupdate/webclient/UpdatesDispatcherWebclientTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclientTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclientTest.java index d9e64d0..6309686 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclientTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/webclient/UpdatesDispatcherWebclientTest.java @@ -75,8 +75,8 @@ void testTargetUpdateFailureMetric() { var registry = new SimpleMeterRegistry(); assertEquals("", registry.getMetersAsString()); - UpdatesDispatcherWebclient.initMetrics(registry); - assertEquals("kkv.target.update.failure(COUNTER)[]; count=0.0", registry.getMetersAsString()); + dispatcher.initMetrics(registry); + assertEquals("kkv.target.update.failure(COUNTER)[]; count=0.0\nkkv.target.update.ok(COUNTER)[]; count=0.0", registry.getMetersAsString()); } From 98abe802a4bb4f4cc5a47795177cb560b9635c21 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Wed, 27 Dec 2023 09:59:19 +0100 Subject: [PATCH 09/24] Adds reconnect logging --- .../yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index 71b738d..853c0d7 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -116,6 +116,13 @@ public void eventReceived(Action action, Endpoints resource) { handleEvent(action, resource); } + @Override + public boolean reconnecting() { + boolean reconnecting = Watcher.super.reconnecting(); + logger.warn("Watcher reconnecting {}", reconnecting); + return reconnecting; + } + @Override public void onClose(WatcherException cause) { // REVIEW what is a reasonable strategy here? From 252f46d7eb236ffbcb9d704dd4ededa381fe70d7 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Wed, 27 Dec 2023 10:24:12 +0100 Subject: [PATCH 10/24] should only be needed for trace level, but why don't we get logs? --- src/main/resources/application.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index f33491f..f5ef0d5 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -52,8 +52,10 @@ quarkus: level: DEBUG "io.fabric8.kubernetes.client": level: DEBUG + min-level: DEBUG "io.vertx.core.http": level: DEBUG + min-level: DEBUG kafka: snappy: From f53c8441c01f0ae8d35034317a5f60ff6061f126 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Wed, 27 Dec 2023 12:52:15 +0100 Subject: [PATCH 11/24] Quarkus 3.6.4 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6e92e9b..06b661c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ UTF-8 quarkus-bom io.quarkus.platform - 3.6.3 + 3.6.4 true 3.1.2 devservices From 9031c9930720da578f08329331c3b4903915e1cf Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Wed, 27 Dec 2023 12:54:07 +0100 Subject: [PATCH 12/24] Don't cache jvm builds, they're cheap --- skaffold.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skaffold.yaml b/skaffold.yaml index 61ff586..ad96ee4 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -15,7 +15,7 @@ build: # buildArgs: # build: package custom: - buildCommand: y-build --opt target=jvm --opt build-arg:build=package + buildCommand: EXPORT_CACHE=false y-build --opt target=jvm --opt build-arg:build=package dependencies: dockerfile: path: ./Dockerfile From c55b61fe23aba32fd1d86e5c9e40c70343b71d4b Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Wed, 27 Dec 2023 16:45:56 +0100 Subject: [PATCH 13/24] Avoids exit, allows the client lib to retry --- .../keyvalue/kubernetes/EndpointsWatcher.java | 36 +++++++++++++++---- .../kubernetes/EndpointsWatcherTest.java | 9 ++--- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index 853c0d7..fe209ca 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -21,6 +21,9 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; import io.fabric8.kubernetes.client.Watcher.Action; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.StartupEvent; @@ -41,8 +44,13 @@ public class EndpointsWatcher { @Inject KubernetesClient client; + private boolean healthUnknown = true; + private Counter countEvent; + private Counter countReconnect; + private Counter countClose; + @Inject - public EndpointsWatcher(EndpointsWatcherConfig config) { + public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { if (config.targetServiceName().isPresent()) { watchEnabled = true; targetServiceName = config.targetServiceName().orElseThrow(); @@ -50,6 +58,16 @@ public EndpointsWatcher(EndpointsWatcherConfig config) { watchEnabled = false; targetServiceName = null; } + Gauge.builder("kkv.watcher.health.unknown", () -> { + if (healthUnknown) return 1.0; + return 0.0; + }).register(registry); + countEvent = Counter.builder("kkv.watcher.event").register(registry); + countReconnect = Counter.builder("kkv.watcher.reconnect").register(registry); + countClose = Counter.builder("kkv.watcher.close").register(registry); + countEvent.increment(0); + countReconnect.increment(0); + countClose.increment(0); } void start(@Observes StartupEvent ev) { @@ -113,11 +131,15 @@ private void watch() { client.endpoints().withName(targetServiceName).watch(new Watcher() { @Override public void eventReceived(Action action, Endpoints resource) { + healthUnknown = false; + countEvent.increment(); handleEvent(action, resource); } @Override public boolean reconnecting() { + healthUnknown = true; + countReconnect.increment(); boolean reconnecting = Watcher.super.reconnecting(); logger.warn("Watcher reconnecting {}", reconnecting); return reconnecting; @@ -125,16 +147,16 @@ public boolean reconnecting() { @Override public void onClose(WatcherException cause) { - // REVIEW what is a reasonable strategy here? - logger.warn("Exiting application due to watch exceptionally closed"); - logger.error(cause.getMessage()); - Quarkus.asyncExit(11); + healthUnknown = true; + countClose.increment(); + logger.error("Watch closed with error", cause); } @Override public void onClose() { - logger.warn("Exiting application due to graceful watch closed"); - Quarkus.asyncExit(11); + healthUnknown = true; + countClose.increment(); + logger.info("Watch closed"); } }); } diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java index 10c9e9e..a00525a 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Watcher.Action; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import se.yolean.kafka.keyvalue.UpdateRecord; import se.yolean.kafka.keyvalue.onupdate.UpdatesBodyPerTopicJSON; @@ -52,7 +53,7 @@ public Optional targetServiceName() { return Optional.empty(); } - }); + }, new SimpleMeterRegistry()); watcher.client = mock(KubernetesClient.class); when(watcher.client.endpoints()).thenReturn(mixedOperationMock); @@ -78,7 +79,7 @@ public Optional targetServiceName() { return Optional.of("target-service-name"); } - }); + }, new SimpleMeterRegistry()); watcher.client = mock(KubernetesClient.class); when(watcher.client.endpoints()).thenReturn(mixedOperationMock); @@ -100,7 +101,7 @@ public Optional targetServiceName() { return Optional.empty(); } - }); + }, new SimpleMeterRegistry()); List notReadyAddresses = List.of( createEndpoint("192.168.0.1", "pod1"), @@ -141,7 +142,7 @@ public Optional targetServiceName() { return Optional.empty(); } - }); + }, new SimpleMeterRegistry()); List notReadyAddresses = List.of( createEndpoint("192.168.0.1", "pod1"), From 509399df255d03f7fe4fb63a0b5c653b7ddf8aab Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Wed, 27 Dec 2023 18:19:21 +0100 Subject: [PATCH 14/24] Reconnect on close, as we're not using an informer --- .../keyvalue/kubernetes/EndpointsWatcher.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index fe209ca..b7da6ac 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -25,7 +25,6 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.fabric8.kubernetes.client.Watcher.Action; -import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.StartupEvent; import se.yolean.kafka.keyvalue.onupdate.UpdatesBodyPerTopic; @@ -44,6 +43,8 @@ public class EndpointsWatcher { @Inject KubernetesClient client; + private Watcher watcher = null; + private boolean healthUnknown = true; private Counter countEvent; private Counter countReconnect; @@ -128,7 +129,13 @@ void handleEvent(Action action, Endpoints resource) { } private void watch() { - client.endpoints().withName(targetServiceName).watch(new Watcher() { + if (watcher == null) createWatcher(); + logger.info("Starting watch"); + client.endpoints().withName(targetServiceName).watch(watcher); + } + + private void createWatcher() { + watcher = new Watcher() { @Override public void eventReceived(Action action, Endpoints resource) { healthUnknown = false; @@ -150,6 +157,7 @@ public void onClose(WatcherException cause) { healthUnknown = true; countClose.increment(); logger.error("Watch closed with error", cause); + watch(); } @Override @@ -157,8 +165,9 @@ public void onClose() { healthUnknown = true; countClose.increment(); logger.info("Watch closed"); + watch(); } - }); + }; } private void emitPendingUpdatesToNowReadyTarget(EndpointAddress address, List updates) { From 14fb31a27ba2ad77a76266c94057d500d51b2d1a Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 24 Jan 2024 10:07:26 +0100 Subject: [PATCH 15/24] Quarkus 3.6.7 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 06b661c..a6c4404 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ UTF-8 quarkus-bom io.quarkus.platform - 3.6.4 + 3.6.7 true 3.1.2 devservices From d99be4e5520428149df413b1426cc3a7cd92e645 Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 24 Jan 2024 10:10:16 +0100 Subject: [PATCH 16/24] integration test for endpoints watch reconnect behavior using testcontainers with k3s --- pom.xml | 19 ++++ .../WatcherReconnectIntegrationTest.java | 104 ++++++++++++++++++ .../testresources/InjectK3sContainer.java | 11 ++ .../testresources/K3sTestResource.java | 42 +++++++ .../KubernetesClientProducer.java | 41 +++++++ src/test/resources/application.yaml | 2 + 6 files changed, 219 insertions(+) create mode 100644 src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java create mode 100644 src/test/java/se/yolean/kafka/keyvalue/testresources/InjectK3sContainer.java create mode 100644 src/test/java/se/yolean/kafka/keyvalue/testresources/K3sTestResource.java create mode 100644 src/test/java/se/yolean/kafka/keyvalue/testresources/KubernetesClientProducer.java diff --git a/pom.xml b/pom.xml index a6c4404..ae488c6 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,25 @@ io.quarkus quarkus-kubernetes-client + + + org.bouncycastle + bcprov-jdk18on + 1.77 + test + + + org.bouncycastle + bcpkix-jdk18on + 1.77 + test + + + + org.testcontainers + k3s + test + io.quarkus quarkus-junit5 diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java new file mode 100644 index 0000000..4137c58 --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java @@ -0,0 +1,104 @@ +package se.yolean.kafka.keyvalue.kubernetes; + +import static org.junit.Assert.assertEquals; + +import java.time.Duration; +import java.util.Map; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.k3s.K3sContainer; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.micrometer.core.instrument.MeterRegistry; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import io.restassured.RestAssured; +import io.smallrye.mutiny.Uni; +import jakarta.inject.Inject; +import se.yolean.kafka.keyvalue.testresources.InjectK3sContainer; +import se.yolean.kafka.keyvalue.testresources.K3sTestResource; + +@Tag("devservices") +@QuarkusTest +@QuarkusTestResource(value = K3sTestResource.class, restrictToAnnotatedClass = true) +@TestProfile(WatcherReconnectIntegrationTest.EndpointsWatcherEnabledTestProfile.class) +public class WatcherReconnectIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(WatcherReconnectIntegrationTest.class); + + @Inject + MeterRegistry registry; + + @InjectK3sContainer + K3sContainer k3s; + + @Inject + KubernetesClient client; + + @Test + void test() { + assertEquals(0, (int) registry.counter("kkv.watcher.close").count()); + performAndWaitOrThrow("Waiting for readiness", () -> { + return RestAssured.get("/q/health/ready").andReturn(); + }, res -> res.statusCode() == 200); + + performAndWaitOrThrow("Waiting for EndpointsWatcher to pick up endpoints", () -> { + return registry.find("kkv.watcher.health.unknown").gauge().value(); + }, res -> res.equals(0d)); + + logger.info("Restarting k3s"); + k3s.getDockerClient().restartContainerCmd(k3s.getContainerId()).exec(); + + performAndWaitOrThrow("Waiting for EndpointsWatcher to lose its watch", () -> { + return registry.find("kkv.watcher.health.unknown").gauge().value(); + }, res -> res.equals(1d)); + + performAndWaitOrThrow("Waiting for the api server to respond after the restart", () -> { + return client.namespaces().list().toString(); + }, res -> res != null); + + // A simple way to cause events is to rollout restart some deployment + client.apps().deployments().inNamespace("kube-system").withName("metrics-server").rolling().restart(); + performAndWaitOrThrow("Waiting for EndpointsWatcher to recover and pick up endpoints changes", () -> { + return registry.find("kkv.watcher.health.unknown").gauge().value(); + }, res -> res.equals(0d)); + } + + /** + * This pattern is reused every time we wait for the application to react to some change + */ + private T performAndWaitOrThrow(String name, Supplier action, Predicate waitUntil) { + logger.debug(name); + return Uni.createFrom().item(() -> { + T result = action.get(); + boolean ok = waitUntil.test(result); + if (!ok) { + throw new RuntimeException(String.format("Result did not fulfill predicate for action \"%s\"", name)); + } + logger.debug("Action \"{}\" completed successfully", name); + return result; + }).onFailure() + .retry() + .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(1)) + .atMost(30) + .await() + .indefinitely(); + } + + public static class EndpointsWatcherEnabledTestProfile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + return Map.of("kkv.target.service.name", "metrics-server"); + } + + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/testresources/InjectK3sContainer.java b/src/test/java/se/yolean/kafka/keyvalue/testresources/InjectK3sContainer.java new file mode 100644 index 0000000..083fbde --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/testresources/InjectK3sContainer.java @@ -0,0 +1,11 @@ +package se.yolean.kafka.keyvalue.testresources; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.lang.annotation.ElementType; +import java.lang.annotation.RetentionPolicy; + +@Target({ ElementType.FIELD }) +@Retention(RetentionPolicy.RUNTIME) +public @interface InjectK3sContainer { +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/testresources/K3sTestResource.java b/src/test/java/se/yolean/kafka/keyvalue/testresources/K3sTestResource.java new file mode 100644 index 0000000..827beb3 --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/testresources/K3sTestResource.java @@ -0,0 +1,42 @@ +package se.yolean.kafka.keyvalue.testresources; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testcontainers.k3s.K3sContainer; +import org.testcontainers.utility.DockerImageName; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class K3sTestResource implements QuarkusTestResourceLifecycleManager { + + K3sContainer k3s; + String configYaml; + + @Override + public Map start() { + k3s = new K3sContainer(DockerImageName.parse("rancher/k3s:v1.27.9-k3s1")); + k3s.setPortBindings(List.of("6443:6443")); + k3s.start(); + + configYaml = k3s.getKubeConfigYaml(); + + System.setProperty("test.kubeconfig", configYaml); + Map props = new HashMap<>(); + props.put("test.kubeconfig", configYaml); + return props; + } + + @Override + public void stop() { + k3s.stop(); + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields(this.k3s, + new TestInjector.AnnotatedAndMatchesType(InjectK3sContainer.class, K3sContainer.class)); + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/testresources/KubernetesClientProducer.java b/src/test/java/se/yolean/kafka/keyvalue/testresources/KubernetesClientProducer.java new file mode 100644 index 0000000..4619732 --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/testresources/KubernetesClientProducer.java @@ -0,0 +1,41 @@ +package se.yolean.kafka.keyvalue.testresources; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.utils.KubernetesSerialization; +import io.quarkus.arc.profile.IfBuildProfile; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Singleton; + +@Singleton +public class KubernetesClientProducer { + + private KubernetesClient client; + + @Produces + @Singleton + @IfBuildProfile("test") + public KubernetesClient kubernetesClient(KubernetesSerialization serialization) { + if (client == null) { + String yamlConfig = System.getProperty("test.kubeconfig"); + + Config config = Config.fromKubeconfig(yamlConfig); + client = new KubernetesClientBuilder() + .withKubernetesSerialization(serialization) + .withConfig(config) + .build(); + } + + return client; + } + + @PreDestroy + public void destroy() { + if (client != null) { + client.close(); + } + } + +} diff --git a/src/test/resources/application.yaml b/src/test/resources/application.yaml index 5a11ebd..560e219 100644 --- a/src/test/resources/application.yaml +++ b/src/test/resources/application.yaml @@ -1,6 +1,8 @@ '%test': quarkus: kafka: + snappy: + enabled: false devservices: topic-partitions: mytopic: 3 From e58855d71c184d8f76a1e53e239434cce64db178 Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 24 Jan 2024 10:12:17 +0100 Subject: [PATCH 17/24] EndpointsWatcher retries on connection closed --- .../keyvalue/kubernetes/EndpointsWatcher.java | 41 +++++++++++++++++-- .../kubernetes/EndpointsWatcherConfig.java | 7 +++- src/main/resources/application.yaml | 2 + .../kubernetes/EndpointsWatcherTest.java | 20 +++++++++ src/test/resources/application.yaml | 5 +++ 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index b7da6ac..3fd176a 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.api.model.EndpointAddress; import io.fabric8.kubernetes.api.model.Endpoints; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.micrometer.core.instrument.Counter; @@ -26,6 +27,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.fabric8.kubernetes.client.Watcher.Action; import io.quarkus.runtime.StartupEvent; +import io.vertx.mutiny.core.Vertx; import se.yolean.kafka.keyvalue.onupdate.UpdatesBodyPerTopic; @ApplicationScoped @@ -39,11 +41,16 @@ public class EndpointsWatcher { private final String targetServiceName; private final boolean watchEnabled; + private final int watchRestartDelaySeconds; @Inject KubernetesClient client; + @Inject + Vertx vertx; + private Watcher watcher = null; + private Watch watch = null; private boolean healthUnknown = true; private Counter countEvent; @@ -52,6 +59,7 @@ public class EndpointsWatcher { @Inject public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { + this.watchRestartDelaySeconds = config.watchRestartDelaySeconds(); if (config.targetServiceName().isPresent()) { watchEnabled = true; targetServiceName = config.targetServiceName().orElseThrow(); @@ -131,7 +139,32 @@ void handleEvent(Action action, Endpoints resource) { private void watch() { if (watcher == null) createWatcher(); logger.info("Starting watch"); - client.endpoints().withName(targetServiceName).watch(watcher); + watch = client.endpoints().withName(targetServiceName).watch(watcher); + } + + private void retryWatch() { + if (watch != null) { + var vwatch = watch; + watch = null; + vwatch.close(); + } + + double jitter = Math.random() * 0.2 + 1; + long delay = Double.valueOf(jitter * watchRestartDelaySeconds * 1000).longValue(); + + logger.info("Retrying watch in {} seconds ({}ms)", delay / 1000, delay); + vertx.setTimer(delay, id -> { + vertx.executeBlocking(() -> { + logger.info("Retrying watch..."); + watch(); + return null; + }).subscribe().with(item -> { + logger.info("Successfully reconnected"); + }, e -> { + logger.error("Failed to watch, trying again", e); + retryWatch(); + }); + }); } private void createWatcher() { @@ -148,7 +181,7 @@ public boolean reconnecting() { healthUnknown = true; countReconnect.increment(); boolean reconnecting = Watcher.super.reconnecting(); - logger.warn("Watcher reconnecting {}", reconnecting); + logger.warn("Watcher reconnecting. \"If the Watcher can reconnect itself after an error\": {}", reconnecting); return reconnecting; } @@ -157,7 +190,7 @@ public void onClose(WatcherException cause) { healthUnknown = true; countClose.increment(); logger.error("Watch closed with error", cause); - watch(); + retryWatch(); } @Override @@ -165,7 +198,7 @@ public void onClose() { healthUnknown = true; countClose.increment(); logger.info("Watch closed"); - watch(); + retryWatch(); } }; } diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java index 42cba12..c1fddb5 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java @@ -5,10 +5,13 @@ import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithName; -@ConfigMapping(prefix = "kkv.target.service") +@ConfigMapping(prefix = "kkv") public interface EndpointsWatcherConfig { - @WithName("name") + @WithName("target.service.name") public Optional targetServiceName(); + @WithName("endpoints-watcher.watch-restart-delay-seconds") + public Integer watchRestartDelaySeconds(); + } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index f5ef0d5..a6d6a78 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,4 +1,6 @@ kkv: + endpoints-watcher: + watch-restart-delay-seconds: ${WATCH_RESTART_DELAY_SECONDS:30} target: path: ${ON_UPDATE_PATH:/kafka-keyvalue/v1/updates} service: diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java index a00525a..d48bbd3 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java @@ -53,6 +53,11 @@ public Optional targetServiceName() { return Optional.empty(); } + @Override + public Integer watchRestartDelaySeconds() { + return 1; + } + }, new SimpleMeterRegistry()); watcher.client = mock(KubernetesClient.class); when(watcher.client.endpoints()).thenReturn(mixedOperationMock); @@ -79,6 +84,11 @@ public Optional targetServiceName() { return Optional.of("target-service-name"); } + @Override + public Integer watchRestartDelaySeconds() { + return 1; + } + }, new SimpleMeterRegistry()); watcher.client = mock(KubernetesClient.class); when(watcher.client.endpoints()).thenReturn(mixedOperationMock); @@ -101,6 +111,11 @@ public Optional targetServiceName() { return Optional.empty(); } + @Override + public Integer watchRestartDelaySeconds() { + return 1; + } + }, new SimpleMeterRegistry()); List notReadyAddresses = List.of( @@ -142,6 +157,11 @@ public Optional targetServiceName() { return Optional.empty(); } + @Override + public Integer watchRestartDelaySeconds() { + return 1; + } + }, new SimpleMeterRegistry()); List notReadyAddresses = List.of( diff --git a/src/test/resources/application.yaml b/src/test/resources/application.yaml index 560e219..43cf6b5 100644 --- a/src/test/resources/application.yaml +++ b/src/test/resources/application.yaml @@ -1,4 +1,9 @@ '%test': + kkv: + endpoints-watcher: + # Quick retries for unit tests + watch-restart-delay-seconds: 1 + quarkus: kafka: snappy: From c37fa239937bb1fb064e0b8ca256c55366d51c82 Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 24 Jan 2024 16:57:52 +0100 Subject: [PATCH 18/24] configure min- and max-retry delay and select a random value between to use --- .../keyvalue/kubernetes/EndpointsWatcher.java | 31 ++++++- .../kubernetes/EndpointsWatcherConfig.java | 8 +- src/main/resources/application.yaml | 3 +- .../kubernetes/EndpointsWatcherTest.java | 87 +++++++++++++++++-- src/test/resources/application.yaml | 3 +- 5 files changed, 116 insertions(+), 16 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index 3fd176a..3ba164d 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -1,5 +1,6 @@ package se.yolean.kafka.keyvalue.kubernetes; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -41,7 +42,8 @@ public class EndpointsWatcher { private final String targetServiceName; private final boolean watchEnabled; - private final int watchRestartDelaySeconds; + private final Duration watchRestartDelayMin; + private final Duration watchRestartDelayMax; @Inject KubernetesClient client; @@ -59,7 +61,8 @@ public class EndpointsWatcher { @Inject public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { - this.watchRestartDelaySeconds = config.watchRestartDelaySeconds(); + this.watchRestartDelayMin = config.watchRestartDelayMin(); + this.watchRestartDelayMax = config.watchRestartDelayMax(); if (config.targetServiceName().isPresent()) { watchEnabled = true; targetServiceName = config.targetServiceName().orElseThrow(); @@ -149,8 +152,7 @@ private void retryWatch() { vwatch.close(); } - double jitter = Math.random() * 0.2 + 1; - long delay = Double.valueOf(jitter * watchRestartDelaySeconds * 1000).longValue(); + long delay = getRetryDelayMs(); logger.info("Retrying watch in {} seconds ({}ms)", delay / 1000, delay); vertx.setTimer(delay, id -> { @@ -167,6 +169,27 @@ private void retryWatch() { }); } + /** + * @return A random value between the maximum and minimum retry delay duration + */ + long getRetryDelayMs() { + return getRetryDelayMs(Math.random()); + } + + /** + * @param x A value between 0 and 1. Its the x in kx+m where k=(max-min) and m=min. + * @return A random value between the maximum and minimum retry delay duration + */ + long getRetryDelayMs(double x) { + if (x < 0 || x > 1) { + throw new RuntimeException("x must be between 0 and 1!"); + } + long minMs = watchRestartDelayMin.toMillis(); + long maxMs = watchRestartDelayMax.toMillis(); + + return Double.valueOf((maxMs - minMs) * x + minMs).longValue(); + } + private void createWatcher() { watcher = new Watcher() { @Override diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java index c1fddb5..91e2cc9 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java @@ -1,5 +1,6 @@ package se.yolean.kafka.keyvalue.kubernetes; +import java.time.Duration; import java.util.Optional; import io.smallrye.config.ConfigMapping; @@ -11,7 +12,10 @@ public interface EndpointsWatcherConfig { @WithName("target.service.name") public Optional targetServiceName(); - @WithName("endpoints-watcher.watch-restart-delay-seconds") - public Integer watchRestartDelaySeconds(); + @WithName("endpoints-watcher.watch-restart-delay-min") + public Duration watchRestartDelayMin(); + + @WithName("endpoints-watcher.watch-restart-delay-max") + public Duration watchRestartDelayMax(); } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index a6d6a78..3776e14 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,6 +1,7 @@ kkv: endpoints-watcher: - watch-restart-delay-seconds: ${WATCH_RESTART_DELAY_SECONDS:30} + watch-restart-delay-min: ${WATCH_RESTART_DELAY_MIN:1ms} + watch-restart-delay-max: ${WATCH_RESTART_DELAY_MAX:60s} target: path: ${ON_UPDATE_PATH:/kafka-keyvalue/v1/updates} service: diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java index d48bbd3..09c61bb 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -54,8 +55,13 @@ public Optional targetServiceName() { } @Override - public Integer watchRestartDelaySeconds() { - return 1; + public Duration watchRestartDelayMin() { + return Duration.ofSeconds(1); + } + + @Override + public Duration watchRestartDelayMax() { + return Duration.ofSeconds(1); } }, new SimpleMeterRegistry()); @@ -85,8 +91,13 @@ public Optional targetServiceName() { } @Override - public Integer watchRestartDelaySeconds() { - return 1; + public Duration watchRestartDelayMin() { + return Duration.ofSeconds(1); + } + + @Override + public Duration watchRestartDelayMax() { + return Duration.ofSeconds(1); } }, new SimpleMeterRegistry()); @@ -112,8 +123,13 @@ public Optional targetServiceName() { } @Override - public Integer watchRestartDelaySeconds() { - return 1; + public Duration watchRestartDelayMin() { + return Duration.ofSeconds(1); + } + + @Override + public Duration watchRestartDelayMax() { + return Duration.ofSeconds(1); } }, new SimpleMeterRegistry()); @@ -158,8 +174,13 @@ public Optional targetServiceName() { } @Override - public Integer watchRestartDelaySeconds() { - return 1; + public Duration watchRestartDelayMin() { + return Duration.ofSeconds(1); + } + + @Override + public Duration watchRestartDelayMax() { + return Duration.ofSeconds(1); } }, new SimpleMeterRegistry()); @@ -217,4 +238,54 @@ public Integer watchRestartDelaySeconds() { assertEquals(Map.of("192.168.0.1", "pod1", "192.168.0.3", "pod3"), watcher.getTargets()); } + @Test + void testGetRetryDelayMs() { + EndpointsWatcher watcher = new EndpointsWatcher(new EndpointsWatcherConfig() { + + @Override + public Optional targetServiceName() { + return Optional.empty(); + } + + @Override + public Duration watchRestartDelayMin() { + return Duration.ofSeconds(1); + } + + @Override + public Duration watchRestartDelayMax() { + return Duration.ofSeconds(10); + } + + }, new SimpleMeterRegistry()); + + assertEquals(1000, watcher.getRetryDelayMs(0)); + assertEquals(5.5 * 1000, watcher.getRetryDelayMs(0.5)); + assertEquals(7.3 * 1000, watcher.getRetryDelayMs(0.7)); + assertEquals(10 * 1000, watcher.getRetryDelayMs(1)); + + watcher = new EndpointsWatcher(new EndpointsWatcherConfig() { + + @Override + public Optional targetServiceName() { + return Optional.empty(); + } + + @Override + public Duration watchRestartDelayMin() { + return Duration.ofMillis(1); + } + + @Override + public Duration watchRestartDelayMax() { + return Duration.ofSeconds(60); + } + + }, new SimpleMeterRegistry()); + assertEquals(1, watcher.getRetryDelayMs(0)); + assertEquals(30 * 1000, watcher.getRetryDelayMs(0.5)); + assertEquals(42 * 1000, watcher.getRetryDelayMs(0.7)); + assertEquals(60 * 1000, watcher.getRetryDelayMs(1)); + } + } diff --git a/src/test/resources/application.yaml b/src/test/resources/application.yaml index 43cf6b5..5b5d4c7 100644 --- a/src/test/resources/application.yaml +++ b/src/test/resources/application.yaml @@ -2,7 +2,8 @@ kkv: endpoints-watcher: # Quick retries for unit tests - watch-restart-delay-seconds: 1 + watch-restart-delay-min: 1s + watch-restart-delay-max: 1s quarkus: kafka: From e30605ea927c3b04757563c4bc49fea06e9463f7 Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 31 Jan 2024 13:47:35 +0100 Subject: [PATCH 19/24] plain watch replaced with informer the informer handles reconnects internally, which makes in unneccesary for us to write our own logic for that. Also, the replaced watch reconnect behaviour we wrote ourselves was not perfect and regularly re-watched without a need to do so --- .../keyvalue/kubernetes/EndpointsWatcher.java | 151 +++++++----------- .../kubernetes/EndpointsWatcherConfig.java | 11 +- src/main/resources/application.yaml | 4 +- .../kubernetes/EndpointsWatcherTest.java | 102 ++++-------- .../WatcherReconnectIntegrationTest.java | 61 ++++--- src/test/resources/application.yaml | 5 +- 6 files changed, 144 insertions(+), 190 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index 3ba164d..52e2c55 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -20,9 +20,8 @@ import io.fabric8.kubernetes.api.model.EndpointAddress; import io.fabric8.kubernetes.api.model.Endpoints; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; @@ -41,9 +40,9 @@ public class EndpointsWatcher { private List>> onTargetReadyConsumers = new ArrayList<>(); private final String targetServiceName; + private final String namespace; + private final Duration resyncPeriod; private final boolean watchEnabled; - private final Duration watchRestartDelayMin; - private final Duration watchRestartDelayMax; @Inject KubernetesClient client; @@ -51,8 +50,7 @@ public class EndpointsWatcher { @Inject Vertx vertx; - private Watcher watcher = null; - private Watch watch = null; + private SharedIndexInformer informer; private boolean healthUnknown = true; private Counter countEvent; @@ -61,8 +59,9 @@ public class EndpointsWatcher { @Inject public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { - this.watchRestartDelayMin = config.watchRestartDelayMin(); - this.watchRestartDelayMax = config.watchRestartDelayMax(); + this.namespace = config.namespace(); + this.resyncPeriod = config.resyncPeriod(); + config.resyncPeriod(); if (config.targetServiceName().isPresent()) { watchEnabled = true; targetServiceName = config.targetServiceName().orElseThrow(); @@ -85,18 +84,26 @@ public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { void start(@Observes StartupEvent ev) { logger.info("EndpointsWatcher onStart"); if (watchEnabled) { - watch(); + inform(); } else { logger.info("No target service name configured, EndpointsWatcher is disabled"); } } + public boolean isWatching() { + return informer != null && informer.isWatching(); + } + public void addOnReadyConsumer(BiConsumer> consumer) { onTargetReadyConsumers.add(consumer); } void handleEvent(Action action, Endpoints resource) { logger.debug("endpoints watch received action: {}", action.toString()); + if (action.equals(Action.DELETED)) { + clearEndpoints(); + return; + } endpoints = resource.getSubsets().stream() .map(subset -> subset.getAddresses()) @@ -139,91 +146,57 @@ void handleEvent(Action action, Endpoints resource) { logger.info("Set new targets: {}", mapEndpointsToTargets(endpoints)); } - private void watch() { - if (watcher == null) createWatcher(); - logger.info("Starting watch"); - watch = client.endpoints().withName(targetServiceName).watch(watcher); - } - - private void retryWatch() { - if (watch != null) { - var vwatch = watch; - watch = null; - vwatch.close(); - } - - long delay = getRetryDelayMs(); - - logger.info("Retrying watch in {} seconds ({}ms)", delay / 1000, delay); - vertx.setTimer(delay, id -> { - vertx.executeBlocking(() -> { - logger.info("Retrying watch..."); - watch(); - return null; - }).subscribe().with(item -> { - logger.info("Successfully reconnected"); - }, e -> { - logger.error("Failed to watch, trying again", e); - retryWatch(); - }); - }); - } - /** - * @return A random value between the maximum and minimum retry delay duration + * Clears the collections of (unready or not) endpoints. + * + * REVIEW Since we're only watching a single "Endpoints" (collection of + * addresses), a delete event means that we should clear all endpoints. If we + * change from watching "Endpoints" to "EndpointSlices" as suggested in + * https://kubernetes.io/docs/concepts/services-networking/service/#endpoints we + * cannot know that the slice contains all addresses which would make this + * method more complex. */ - long getRetryDelayMs() { - return getRetryDelayMs(Math.random()); + private void clearEndpoints() { + unreadyEndpoints.clear(); + endpoints.clear(); + logger.info("Cleared all targets"); } - /** - * @param x A value between 0 and 1. Its the x in kx+m where k=(max-min) and m=min. - * @return A random value between the maximum and minimum retry delay duration - */ - long getRetryDelayMs(double x) { - if (x < 0 || x > 1) { - throw new RuntimeException("x must be between 0 and 1!"); - } - long minMs = watchRestartDelayMin.toMillis(); - long maxMs = watchRestartDelayMax.toMillis(); - - return Double.valueOf((maxMs - minMs) * x + minMs).longValue(); + private void inform() { + if (informer == null) createInformer(); + logger.info("Started informer"); } - private void createWatcher() { - watcher = new Watcher() { - @Override - public void eventReceived(Action action, Endpoints resource) { - healthUnknown = false; - countEvent.increment(); - handleEvent(action, resource); - } - - @Override - public boolean reconnecting() { - healthUnknown = true; - countReconnect.increment(); - boolean reconnecting = Watcher.super.reconnecting(); - logger.warn("Watcher reconnecting. \"If the Watcher can reconnect itself after an error\": {}", reconnecting); - return reconnecting; - } - - @Override - public void onClose(WatcherException cause) { - healthUnknown = true; - countClose.increment(); - logger.error("Watch closed with error", cause); - retryWatch(); - } - - @Override - public void onClose() { - healthUnknown = true; - countClose.increment(); - logger.info("Watch closed"); - retryWatch(); - } - }; + private void createInformer() { + informer = client.endpoints() + .inNamespace(namespace) + .withName(targetServiceName) + .inform( + new ResourceEventHandler() { + + @Override + public void onAdd(Endpoints obj) { + countEvent.increment(); + handleEvent(Action.ADDED, obj); + } + + @Override + public void onUpdate(Endpoints oldObj, Endpoints newObj) { + countEvent.increment(); + handleEvent(Action.MODIFIED, newObj); + } + + @Override + public void onDelete(Endpoints obj, boolean deletedFinalStateUnknown) { + // The "Endpoints" object we watch contains all endpoints for the target + // service. Delete only happens if the service itself is deleted, never when + // individual endpoint-addresses change. + logger.warn("Endpoints onDelete {}", obj); + handleEvent(Action.DELETED, obj); + } + + }, resyncPeriod.toMillis() + ); } private void emitPendingUpdatesToNowReadyTarget(EndpointAddress address, List updates) { diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java index 91e2cc9..7ff2b6f 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java @@ -9,13 +9,14 @@ @ConfigMapping(prefix = "kkv") public interface EndpointsWatcherConfig { + @WithName("namespace") + public String namespace(); + @WithName("target.service.name") public Optional targetServiceName(); - @WithName("endpoints-watcher.watch-restart-delay-min") - public Duration watchRestartDelayMin(); - - @WithName("endpoints-watcher.watch-restart-delay-max") - public Duration watchRestartDelayMax(); + /** @return How often the informer rebuilds it cache. */ + @WithName("endpoints-watcher.resync-period") + public Duration resyncPeriod(); } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 3776e14..5c5cea9 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,7 +1,7 @@ kkv: + namespace: ${NAMESPACE} endpoints-watcher: - watch-restart-delay-min: ${WATCH_RESTART_DELAY_MIN:1ms} - watch-restart-delay-max: ${WATCH_RESTART_DELAY_MAX:60s} + resync-period: ${INFORMER_RESYNC_PERIOD:5m} target: path: ${ON_UPDATE_PATH:/kafka-keyvalue/v1/updates} service: diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java index 09c61bb..5358e11 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java @@ -25,7 +25,9 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher.Action; import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import se.yolean.kafka.keyvalue.UpdateRecord; import se.yolean.kafka.keyvalue.onupdate.UpdatesBodyPerTopicJSON; @@ -55,13 +57,13 @@ public Optional targetServiceName() { } @Override - public Duration watchRestartDelayMin() { - return Duration.ofSeconds(1); + public String namespace() { + return "dev"; } @Override - public Duration watchRestartDelayMax() { - return Duration.ofSeconds(1); + public Duration resyncPeriod() { + return Duration.ofMinutes(5); } }, new SimpleMeterRegistry()); @@ -76,13 +78,16 @@ public Duration watchRestartDelayMax() { } @Test + @SuppressWarnings("unchecked") public void watchEnabledTest() { interface MixedOperationMock extends MixedOperation> {} MixedOperation> mixedOperationMock = mock(MixedOperationMock.class); interface ResourceMock extends Resource {} Resource resourceMock = mock(ResourceMock.class); - when(mixedOperationMock.withName(any())).thenReturn(resourceMock); - when(resourceMock.watch(any())).thenReturn(mock(Watch.class)); + NonNamespaceOperation> nonNamespaceOperationMock = mock(NonNamespaceOperation.class); + when(mixedOperationMock.inNamespace(any())).thenReturn(nonNamespaceOperationMock); + when(nonNamespaceOperationMock.withName(any())).thenReturn(resourceMock); + when(resourceMock.inform(any())).thenReturn(mock(SharedIndexInformer.class)); var watcher = new EndpointsWatcher(new EndpointsWatcherConfig() { @Override @@ -91,13 +96,13 @@ public Optional targetServiceName() { } @Override - public Duration watchRestartDelayMin() { - return Duration.ofSeconds(1); + public String namespace() { + return "dev"; } @Override - public Duration watchRestartDelayMax() { - return Duration.ofSeconds(1); + public Duration resyncPeriod() { + return Duration.ofMinutes(5); } }, new SimpleMeterRegistry()); @@ -106,11 +111,14 @@ public Duration watchRestartDelayMax() { watcher.start(null); - ArgumentCaptor targetServiceNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor resyncPeriodCaptor = ArgumentCaptor.forClass(Long.class); + // ArgumentCaptor targetServiceNameCaptor = ArgumentCaptor.forClass(String.class); verify(watcher.client, times(1)).endpoints(); - verify(mixedOperationMock, times(1)).withName(targetServiceNameCaptor.capture()); - assertEquals("target-service-name", targetServiceNameCaptor.getValue()); - verify(resourceMock, times(1)).watch(any()); + verify(nonNamespaceOperationMock, times(1)).withName("target-service-name"); + verify(mixedOperationMock, times(1)).inNamespace("dev"); + // assertEquals("target-service-name", targetServiceNameCaptor.getValue()); + verify(resourceMock, times(1)).inform(any(), resyncPeriodCaptor.capture()); + assertEquals(5 * 60 * 1000, resyncPeriodCaptor.getValue()); } @Test @@ -123,13 +131,13 @@ public Optional targetServiceName() { } @Override - public Duration watchRestartDelayMin() { - return Duration.ofSeconds(1); + public String namespace() { + return "dev"; } @Override - public Duration watchRestartDelayMax() { - return Duration.ofSeconds(1); + public Duration resyncPeriod() { + return Duration.ofMinutes(5); } }, new SimpleMeterRegistry()); @@ -174,13 +182,13 @@ public Optional targetServiceName() { } @Override - public Duration watchRestartDelayMin() { - return Duration.ofSeconds(1); + public String namespace() { + return "dev"; } @Override - public Duration watchRestartDelayMax() { - return Duration.ofSeconds(1); + public Duration resyncPeriod() { + return Duration.ofMinutes(5); } }, new SimpleMeterRegistry()); @@ -238,54 +246,4 @@ public Duration watchRestartDelayMax() { assertEquals(Map.of("192.168.0.1", "pod1", "192.168.0.3", "pod3"), watcher.getTargets()); } - @Test - void testGetRetryDelayMs() { - EndpointsWatcher watcher = new EndpointsWatcher(new EndpointsWatcherConfig() { - - @Override - public Optional targetServiceName() { - return Optional.empty(); - } - - @Override - public Duration watchRestartDelayMin() { - return Duration.ofSeconds(1); - } - - @Override - public Duration watchRestartDelayMax() { - return Duration.ofSeconds(10); - } - - }, new SimpleMeterRegistry()); - - assertEquals(1000, watcher.getRetryDelayMs(0)); - assertEquals(5.5 * 1000, watcher.getRetryDelayMs(0.5)); - assertEquals(7.3 * 1000, watcher.getRetryDelayMs(0.7)); - assertEquals(10 * 1000, watcher.getRetryDelayMs(1)); - - watcher = new EndpointsWatcher(new EndpointsWatcherConfig() { - - @Override - public Optional targetServiceName() { - return Optional.empty(); - } - - @Override - public Duration watchRestartDelayMin() { - return Duration.ofMillis(1); - } - - @Override - public Duration watchRestartDelayMax() { - return Duration.ofSeconds(60); - } - - }, new SimpleMeterRegistry()); - assertEquals(1, watcher.getRetryDelayMs(0)); - assertEquals(30 * 1000, watcher.getRetryDelayMs(0.5)); - assertEquals(42 * 1000, watcher.getRetryDelayMs(0.7)); - assertEquals(60 * 1000, watcher.getRetryDelayMs(1)); - } - } diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java index 4137c58..5152041 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java @@ -3,6 +3,8 @@ import static org.junit.Assert.assertEquals; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.function.Predicate; import java.util.function.Supplier; @@ -42,52 +44,69 @@ public class WatcherReconnectIntegrationTest { @Inject KubernetesClient client; + @Inject + EndpointsWatcherConfig config; + + @Inject + EndpointsWatcher watcher; + @Test void test() { assertEquals(0, (int) registry.counter("kkv.watcher.close").count()); performAndWaitOrThrow("Waiting for readiness", () -> { return RestAssured.get("/q/health/ready").andReturn(); - }, res -> res.statusCode() == 200); + }, res -> res.statusCode() == 200, 10); - performAndWaitOrThrow("Waiting for EndpointsWatcher to pick up endpoints", () -> { - return registry.find("kkv.watcher.health.unknown").gauge().value(); - }, res -> res.equals(0d)); + logger.info("initial targets: {}", watcher.getTargets()); + performAndWaitOrThrow("Waiting for the pod to be ready so that EndpointsWatcher picks up the target", () -> { + return watcher.getTargets(); + }, (targets) -> targets.size() == 1, 25); - logger.info("Restarting k3s"); + logger.info("Restarting k3s to validate reconnect behavior"); k3s.getDockerClient().restartContainerCmd(k3s.getContainerId()).exec(); - - performAndWaitOrThrow("Waiting for EndpointsWatcher to lose its watch", () -> { - return registry.find("kkv.watcher.health.unknown").gauge().value(); - }, res -> res.equals(1d)); - performAndWaitOrThrow("Waiting for the api server to respond after the restart", () -> { return client.namespaces().list().toString(); - }, res -> res != null); + }, res -> res != null, 5); + performAndWaitOrThrow("Waiting for the informer to recover", () -> watcher.isWatching(), watching -> watching, 5); + performAndWaitOrThrow("Waiting for EndpointsWatcher to find target", () -> watcher.getTargets(), (targets) -> { + return targets.size() == 1; + }, 5); // A simple way to cause events is to rollout restart some deployment + logger.info("Restarting deployment to provoke endpoint changes"); client.apps().deployments().inNamespace("kube-system").withName("metrics-server").rolling().restart(); - performAndWaitOrThrow("Waiting for EndpointsWatcher to recover and pick up endpoints changes", () -> { - return registry.find("kkv.watcher.health.unknown").gauge().value(); - }, res -> res.equals(0d)); + performAndWaitOrThrow("Waiting for the informer to lose old targets", () -> watcher.getTargets(), (targets) -> { + return targets.size() == 0; + }, 15); + performAndWaitOrThrow("Waiting for the new target to appear", () -> watcher.getTargets(), (list) -> { + return list.size() == 1; + }, 60); + + client.services().inNamespace("kube-system").withName("metrics-server").delete(); + performAndWaitOrThrow("Waiting for endpoints to be deleted", () -> watcher.getTargets(), (list) -> { + return list.size() == 0; + }, 5); } /** * This pattern is reused every time we wait for the application to react to some change */ - private T performAndWaitOrThrow(String name, Supplier action, Predicate waitUntil) { + private T performAndWaitOrThrow(String name, Supplier action, Predicate waitUntil, int attempts) { logger.debug(name); + List att = new ArrayList<>(); return Uni.createFrom().item(() -> { + att.add(true); T result = action.get(); boolean ok = waitUntil.test(result); if (!ok) { throw new RuntimeException(String.format("Result did not fulfill predicate for action \"%s\"", name)); } - logger.debug("Action \"{}\" completed successfully", name); + logger.debug("Action \"{}\" completed successfully after {} attempts", name, att.size()); return result; }).onFailure() .retry() - .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(1)) - .atMost(30) + .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(2)) + .atMost(attempts) .await() .indefinitely(); } @@ -96,7 +115,11 @@ public static class EndpointsWatcherEnabledTestProfile implements QuarkusTestPro @Override public Map getConfigOverrides() { - return Map.of("kkv.target.service.name", "metrics-server"); + // Here we find existing pods in the k3s cluster + return Map.of( + "kkv.target.service.name", "metrics-server", + "kkv.namespace", "kube-system" + ); } } diff --git a/src/test/resources/application.yaml b/src/test/resources/application.yaml index 5b5d4c7..8bf0889 100644 --- a/src/test/resources/application.yaml +++ b/src/test/resources/application.yaml @@ -1,9 +1,8 @@ '%test': kkv: + namespace: dev endpoints-watcher: - # Quick retries for unit tests - watch-restart-delay-min: 1s - watch-restart-delay-max: 1s + resync-period: 5m quarkus: kafka: From 310720c0223fc6b472a861e7dd215d1e5c9a3efa Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 31 Jan 2024 13:49:01 +0100 Subject: [PATCH 20/24] add synchornized to handleEvent method we modify two collections in sequence and the method is invoked by random threadpool-threads --- .../se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index 52e2c55..b8dcac8 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -98,7 +98,7 @@ public void addOnReadyConsumer(BiConsumer Date: Wed, 31 Jan 2024 16:02:14 +0100 Subject: [PATCH 21/24] config property and metrics fix --- .../keyvalue/kubernetes/EndpointsWatcher.java | 16 ++++------------ .../kubernetes/EndpointsWatcherConfig.java | 8 ++++---- src/main/resources/application.yaml | 5 ++--- .../kubernetes/EndpointsWatcherTest.java | 8 ++++---- .../WatcherReconnectIntegrationTest.java | 6 +++--- src/test/resources/application.yaml | 7 ++++--- 6 files changed, 21 insertions(+), 29 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index b8dcac8..d799344 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -52,16 +52,12 @@ public class EndpointsWatcher { private SharedIndexInformer informer; - private boolean healthUnknown = true; private Counter countEvent; - private Counter countReconnect; - private Counter countClose; @Inject public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { this.namespace = config.namespace(); - this.resyncPeriod = config.resyncPeriod(); - config.resyncPeriod(); + this.resyncPeriod = config.informerResyncPeriod(); if (config.targetServiceName().isPresent()) { watchEnabled = true; targetServiceName = config.targetServiceName().orElseThrow(); @@ -69,16 +65,12 @@ public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { watchEnabled = false; targetServiceName = null; } - Gauge.builder("kkv.watcher.health.unknown", () -> { - if (healthUnknown) return 1.0; - return 0.0; + Gauge.builder("kkv.watcher.down", () -> { + if (isWatching()) return 0.0; + return 1.0; }).register(registry); countEvent = Counter.builder("kkv.watcher.event").register(registry); - countReconnect = Counter.builder("kkv.watcher.reconnect").register(registry); - countClose = Counter.builder("kkv.watcher.close").register(registry); countEvent.increment(0); - countReconnect.increment(0); - countClose.increment(0); } void start(@Observes StartupEvent ev) { diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java index 7ff2b6f..4e92b2d 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java @@ -6,17 +6,17 @@ import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithName; -@ConfigMapping(prefix = "kkv") +@ConfigMapping(prefix = "kkv.target.service") public interface EndpointsWatcherConfig { @WithName("namespace") public String namespace(); - @WithName("target.service.name") + @WithName("name") public Optional targetServiceName(); /** @return How often the informer rebuilds it cache. */ - @WithName("endpoints-watcher.resync-period") - public Duration resyncPeriod(); + @WithName("informer-resync-period") + public Duration informerResyncPeriod(); } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 5c5cea9..ff5badd 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,12 +1,11 @@ kkv: - namespace: ${NAMESPACE} - endpoints-watcher: - resync-period: ${INFORMER_RESYNC_PERIOD:5m} target: path: ${ON_UPDATE_PATH:/kafka-keyvalue/v1/updates} service: name: ${TARGET_SERVICE_NAME:} port: ${TARGET_SERVICE_PORT:8080} + namespace: ${TARGET_SERVICE_NAMESPACE} + informer-resync-period: ${INFORMER_RESYNC_PERIOD:5m} static: host: ${TARGET_STATIC_HOST:} port: ${TARGET_STATIC_PORT:8080} diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java index 5358e11..1640b74 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java @@ -62,7 +62,7 @@ public String namespace() { } @Override - public Duration resyncPeriod() { + public Duration informerResyncPeriod() { return Duration.ofMinutes(5); } @@ -101,7 +101,7 @@ public String namespace() { } @Override - public Duration resyncPeriod() { + public Duration informerResyncPeriod() { return Duration.ofMinutes(5); } @@ -136,7 +136,7 @@ public String namespace() { } @Override - public Duration resyncPeriod() { + public Duration informerResyncPeriod() { return Duration.ofMinutes(5); } @@ -187,7 +187,7 @@ public String namespace() { } @Override - public Duration resyncPeriod() { + public Duration informerResyncPeriod() { return Duration.ofMinutes(5); } diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java index 5152041..5ace035 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/WatcherReconnectIntegrationTest.java @@ -1,6 +1,6 @@ package se.yolean.kafka.keyvalue.kubernetes; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.time.Duration; import java.util.ArrayList; @@ -52,10 +52,10 @@ public class WatcherReconnectIntegrationTest { @Test void test() { - assertEquals(0, (int) registry.counter("kkv.watcher.close").count()); performAndWaitOrThrow("Waiting for readiness", () -> { return RestAssured.get("/q/health/ready").andReturn(); }, res -> res.statusCode() == 200, 10); + assertTrue(RestAssured.get("/q/metrics").andReturn().asString().contains("kkv_watcher_down 0.0")); logger.info("initial targets: {}", watcher.getTargets()); performAndWaitOrThrow("Waiting for the pod to be ready so that EndpointsWatcher picks up the target", () -> { @@ -118,7 +118,7 @@ public Map getConfigOverrides() { // Here we find existing pods in the k3s cluster return Map.of( "kkv.target.service.name", "metrics-server", - "kkv.namespace", "kube-system" + "kkv.target.service.namespace", "kube-system" ); } diff --git a/src/test/resources/application.yaml b/src/test/resources/application.yaml index 8bf0889..5fd656f 100644 --- a/src/test/resources/application.yaml +++ b/src/test/resources/application.yaml @@ -1,8 +1,9 @@ '%test': kkv: - namespace: dev - endpoints-watcher: - resync-period: 5m + target: + service: + namespace: dev + informer-resync-period: 5m quarkus: kafka: From 0821f64cd76ec4ea51dba9d151cb4103928349c6 Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 31 Jan 2024 16:02:24 +0100 Subject: [PATCH 22/24] quarkus 3.6.8 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ae488c6..ea34403 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ UTF-8 quarkus-bom io.quarkus.platform - 3.6.7 + 3.6.8 true 3.1.2 devservices From 7f6854a77235d3eb89cfef5e7a2bc992520b369a Mon Sep 17 00:00:00 2001 From: NiklasJonsson6 Date: Wed, 31 Jan 2024 17:37:49 +0100 Subject: [PATCH 23/24] require targetServiceNamespace if informer is enabled --- .../keyvalue/kubernetes/EndpointsWatcher.java | 9 ++++++--- .../kubernetes/EndpointsWatcherConfig.java | 2 +- src/main/resources/application.yaml | 2 +- .../kubernetes/EndpointsWatcherTest.java | 16 ++++++++-------- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java index d799344..2097fb4 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcher.java @@ -40,7 +40,7 @@ public class EndpointsWatcher { private List>> onTargetReadyConsumers = new ArrayList<>(); private final String targetServiceName; - private final String namespace; + private final String targetServiceNamespace; private final Duration resyncPeriod; private final boolean watchEnabled; @@ -56,14 +56,17 @@ public class EndpointsWatcher { @Inject public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) { - this.namespace = config.namespace(); this.resyncPeriod = config.informerResyncPeriod(); if (config.targetServiceName().isPresent()) { watchEnabled = true; targetServiceName = config.targetServiceName().orElseThrow(); + targetServiceNamespace = config.targetServiceNamespace().orElseThrow(() -> { + return new RuntimeException("target service namespace is required when watch is enabled"); + }); } else { watchEnabled = false; targetServiceName = null; + targetServiceNamespace = null; } Gauge.builder("kkv.watcher.down", () -> { if (isWatching()) return 0.0; @@ -161,7 +164,7 @@ private void inform() { private void createInformer() { informer = client.endpoints() - .inNamespace(namespace) + .inNamespace(targetServiceNamespace) .withName(targetServiceName) .inform( new ResourceEventHandler() { diff --git a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java index 4e92b2d..4cc2272 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java +++ b/src/main/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherConfig.java @@ -10,7 +10,7 @@ public interface EndpointsWatcherConfig { @WithName("namespace") - public String namespace(); + public Optional targetServiceNamespace(); @WithName("name") public Optional targetServiceName(); diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index ff5badd..f61cb27 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -4,7 +4,7 @@ kkv: service: name: ${TARGET_SERVICE_NAME:} port: ${TARGET_SERVICE_PORT:8080} - namespace: ${TARGET_SERVICE_NAMESPACE} + namespace: ${TARGET_SERVICE_NAMESPACE:} informer-resync-period: ${INFORMER_RESYNC_PERIOD:5m} static: host: ${TARGET_STATIC_HOST:} diff --git a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java index 1640b74..b3c1b7f 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/kubernetes/EndpointsWatcherTest.java @@ -57,8 +57,8 @@ public Optional targetServiceName() { } @Override - public String namespace() { - return "dev"; + public Optional targetServiceNamespace() { + return Optional.of("dev"); } @Override @@ -96,8 +96,8 @@ public Optional targetServiceName() { } @Override - public String namespace() { - return "dev"; + public Optional targetServiceNamespace() { + return Optional.of("dev"); } @Override @@ -131,8 +131,8 @@ public Optional targetServiceName() { } @Override - public String namespace() { - return "dev"; + public Optional targetServiceNamespace() { + return Optional.of("dev"); } @Override @@ -182,8 +182,8 @@ public Optional targetServiceName() { } @Override - public String namespace() { - return "dev"; + public Optional targetServiceNamespace() { + return Optional.of("dev"); } @Override From d3ecc7af8ab92f489223ac1164e1a426cbcbd4a3 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Thu, 1 Feb 2024 06:41:06 +0100 Subject: [PATCH 24/24] Builder with latest Mandrel and JRE runtime with java 21.0.2 --- Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6c55e5e..66ebfbf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:9e78afadc64b01ef70c00add6039f3f31e1c2542@sha256:c8fba1c08a1af43a7038f3337fe2af98aec15a7cf31c3dc12c39ade540c827f7 \ +FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:27a3231275db6e47ad615a4c88f5550c52e902a4e3bcd3cad73fff4bab87bb84 \ as jnilib # https://github.com/xerial/snappy-java/blob/master/src/main/java/org/xerial/snappy/OSInfo.java#L113 @@ -9,7 +9,7 @@ RUN set -ex; \ mkdir -pv native/$LIBPATH; \ cp -v /usr/lib/$ARCH-linux-gnu/jni/* native/$LIBPATH/ -FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:9e78afadc64b01ef70c00add6039f3f31e1c2542@sha256:c8fba1c08a1af43a7038f3337fe2af98aec15a7cf31c3dc12c39ade540c827f7 \ +FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:27a3231275db6e47ad615a4c88f5550c52e902a4e3bcd3cad73fff4bab87bb84 \ as dev COPY pom.xml . @@ -38,7 +38,7 @@ ARG build="package -Pnative" RUN mvn --batch-mode $build -FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu-jre:9e78afadc64b01ef70c00add6039f3f31e1c2542@sha256:4d8ed25a81daac1f0434081346fcad719be13fd8dfbf4a793821b22149d5b79e \ +FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu-jre:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:257887716945e41ab02d45da2740a23c105731144893507cb10dc2be94a363bf \ as jvm WORKDIR /app @@ -50,7 +50,7 @@ ENTRYPOINT [ "java", \ "-Djava.util.logging.manager=org.jboss.logmanager.LogManager", \ "-jar", "quarkus-run.jar" ] -FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu:177518b0a77298d34f0caf1d0fcdc13750c355a8@sha256:ccd94ad8f1b6d90aa90f5fa5efaf2db57b0d3e6a29faea84ab1edc5777a23c99 +FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:64d2e21c5f44d3e518a882d3b794805789fb25eba2d0cc31c597d703f943ca4d COPY --from=dev /workspace/target/*-runner /usr/local/bin/quarkus