Skip to content

Commit

Permalink
Merge pull request #56 from Yolean/watcher-handle-graceful-onclose
Browse files Browse the repository at this point in the history
We've run this for a few weeks now without issue. The informer seems to behave very gracefully!
  • Loading branch information
NiklasJonsson6 authored Feb 13, 2024
2 parents 2473614 + d3ecc7a commit 44affa5
Show file tree
Hide file tree
Showing 14 changed files with 436 additions and 44 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:9e78afadc64b01ef70c00add6039f3f31e1c2542@sha256:c8fba1c08a1af43a7038f3337fe2af98aec15a7cf31c3dc12c39ade540c827f7 \
FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:27a3231275db6e47ad615a4c88f5550c52e902a4e3bcd3cad73fff4bab87bb84 \
as jnilib

# https://github.com/xerial/snappy-java/blob/master/src/main/java/org/xerial/snappy/OSInfo.java#L113
Expand All @@ -9,7 +9,7 @@ RUN set -ex; \
mkdir -pv native/$LIBPATH; \
cp -v /usr/lib/$ARCH-linux-gnu/jni/* native/$LIBPATH/

FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:9e78afadc64b01ef70c00add6039f3f31e1c2542@sha256:c8fba1c08a1af43a7038f3337fe2af98aec15a7cf31c3dc12c39ade540c827f7 \
FROM --platform=$TARGETPLATFORM docker.io/yolean/builder-quarkus:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:27a3231275db6e47ad615a4c88f5550c52e902a4e3bcd3cad73fff4bab87bb84 \
as dev

COPY pom.xml .
Expand Down Expand Up @@ -38,7 +38,7 @@ ARG build="package -Pnative"

RUN mvn --batch-mode $build

FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu-jre:9e78afadc64b01ef70c00add6039f3f31e1c2542@sha256:4d8ed25a81daac1f0434081346fcad719be13fd8dfbf4a793821b22149d5b79e \
FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu-jre:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:257887716945e41ab02d45da2740a23c105731144893507cb10dc2be94a363bf \
as jvm

WORKDIR /app
Expand All @@ -50,7 +50,7 @@ ENTRYPOINT [ "java", \
"-Djava.util.logging.manager=org.jboss.logmanager.LogManager", \
"-jar", "quarkus-run.jar" ]

FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu:177518b0a77298d34f0caf1d0fcdc13750c355a8@sha256:ccd94ad8f1b6d90aa90f5fa5efaf2db57b0d3e6a29faea84ab1edc5777a23c99
FROM --platform=$TARGETPLATFORM docker.io/yolean/runtime-quarkus-ubuntu:394fc7c4a84a1b54cf9558b44a6607911c01e6dc@sha256:64d2e21c5f44d3e518a882d3b794805789fb25eba2d0cc31c597d703f943ca4d

COPY --from=dev /workspace/target/*-runner /usr/local/bin/quarkus

Expand Down
21 changes: 20 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.6.0</quarkus.platform.version>
<quarkus.platform.version>3.6.8</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.1.2</surefire-plugin.version>
<x-test-exclude-groups>devservices</x-test-exclude-groups>
Expand Down Expand Up @@ -61,6 +61,25 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<!-- These are optional dependencies of kubernetes-client, needed for testcontainers k3s to work -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.77</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>1.77</version>
<scope>test</scope>
</dependency>
<!-- -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>k3s</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion skaffold.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ build:
# buildArgs:
# build: package
custom:
buildCommand: y-build --opt target=jvm --opt build-arg:build=package
buildCommand: EXPORT_CACHE=false y-build --opt target=jvm --opt build-arg:build=package
dependencies:
dockerfile:
path: ./Dockerfile
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package se.yolean.kafka.keyvalue.kubernetes;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -19,11 +20,14 @@
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import se.yolean.kafka.keyvalue.onupdate.UpdatesBodyPerTopic;

@ApplicationScoped
Expand All @@ -36,37 +40,65 @@ public class EndpointsWatcher {
private List<BiConsumer<UpdatesBodyPerTopic, Map<String, String>>> onTargetReadyConsumers = new ArrayList<>();

private final String targetServiceName;
private final String targetServiceNamespace;
private final Duration resyncPeriod;
private final boolean watchEnabled;

@Inject
KubernetesClient client;

@Inject
public EndpointsWatcher(EndpointsWatcherConfig config) {
Vertx vertx;

private SharedIndexInformer<Endpoints> informer;

private Counter countEvent;

@Inject
public EndpointsWatcher(EndpointsWatcherConfig config, MeterRegistry registry) {
this.resyncPeriod = config.informerResyncPeriod();
if (config.targetServiceName().isPresent()) {
watchEnabled = true;
targetServiceName = config.targetServiceName().orElseThrow();
targetServiceNamespace = config.targetServiceNamespace().orElseThrow(() -> {
return new RuntimeException("target service namespace is required when watch is enabled");
});
} else {
watchEnabled = false;
targetServiceName = null;
targetServiceNamespace = null;
}
Gauge.builder("kkv.watcher.down", () -> {
if (isWatching()) return 0.0;
return 1.0;
}).register(registry);
countEvent = Counter.builder("kkv.watcher.event").register(registry);
countEvent.increment(0);
}

void start(@Observes StartupEvent ev) {
logger.info("EndpointsWatcher onStart");
if (watchEnabled) {
watch();
inform();
} else {
logger.info("No target service name configured, EndpointsWatcher is disabled");
}
}

public boolean isWatching() {
return informer != null && informer.isWatching();
}

public void addOnReadyConsumer(BiConsumer<UpdatesBodyPerTopic, Map<String, String>> consumer) {
onTargetReadyConsumers.add(consumer);
}

void handleEvent(Action action, Endpoints resource) {
synchronized void handleEvent(Action action, Endpoints resource) {
logger.debug("endpoints watch received action: {}", action.toString());
if (action.equals(Action.DELETED)) {
clearEndpoints();
return;
}

endpoints = resource.getSubsets().stream()
.map(subset -> subset.getAddresses())
Expand Down Expand Up @@ -109,21 +141,57 @@ void handleEvent(Action action, Endpoints resource) {
logger.info("Set new targets: {}", mapEndpointsToTargets(endpoints));
}

private void watch() {
client.endpoints().withName(targetServiceName).watch(new Watcher<Endpoints>() {
@Override
public void eventReceived(Action action, Endpoints resource) {
handleEvent(action, resource);
}
/**
* Clears the collections of (unready or not) endpoints.
*
* REVIEW Since we're only watching a single "Endpoints" (collection of
* addresses), a delete event means that we should clear all endpoints. If we
* change from watching "Endpoints" to "EndpointSlices" as suggested in
* https://kubernetes.io/docs/concepts/services-networking/service/#endpoints we
* cannot know that the slice contains all addresses which would make this
* method more complex.
*/
private void clearEndpoints() {
unreadyEndpoints.clear();
endpoints.clear();
logger.info("Cleared all targets");
}

@Override
public void onClose(WatcherException cause) {
// REVIEW what is a reasonable strategy here?
logger.warn("Exiting application due to watch closed");
logger.error(cause.getMessage());
Quarkus.asyncExit(11);
}
});
private void inform() {
if (informer == null) createInformer();
logger.info("Started informer");
}

private void createInformer() {
informer = client.endpoints()
.inNamespace(targetServiceNamespace)
.withName(targetServiceName)
.inform(
new ResourceEventHandler<Endpoints>() {

@Override
public void onAdd(Endpoints obj) {
countEvent.increment();
handleEvent(Action.ADDED, obj);
}

@Override
public void onUpdate(Endpoints oldObj, Endpoints newObj) {
countEvent.increment();
handleEvent(Action.MODIFIED, newObj);
}

@Override
public void onDelete(Endpoints obj, boolean deletedFinalStateUnknown) {
// The "Endpoints" object we watch contains all endpoints for the target
// service. Delete only happens if the service itself is deleted, never when
// individual endpoint-addresses change.
logger.warn("Endpoints onDelete {}", obj);
handleEvent(Action.DELETED, obj);
}

}, resyncPeriod.toMillis()
);
}

private void emitPendingUpdatesToNowReadyTarget(EndpointAddress address, List<UpdatesBodyPerTopic> updates) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package se.yolean.kafka.keyvalue.kubernetes;

import java.time.Duration;
import java.util.Optional;

import io.smallrye.config.ConfigMapping;
Expand All @@ -8,7 +9,14 @@
@ConfigMapping(prefix = "kkv.target.service")
public interface EndpointsWatcherConfig {

@WithName("namespace")
public Optional<String> targetServiceNamespace();

@WithName("name")
public Optional<String> targetServiceName();

/** @return How often the informer rebuilds it cache. */
@WithName("informer-resync-period")
public Duration informerResyncPeriod();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.smallrye.mutiny.Uni;
import io.vertx.core.buffer.Buffer;
Expand All @@ -28,21 +29,29 @@ public class UpdatesDispatcherWebclient implements UpdatesDispatcher {

EndpointsWatcher watcher;

private MeterRegistry registry;

@Inject
UpdatesDispatcherWebclientConfig config;

private final WebClient webClient;

static void initMetrics(MeterRegistry registry) {
registry.counter("kkv.target.update.failure").increment(0);
private Counter countFailures;

private Counter countSuccess;

void initMetrics(MeterRegistry registry) {
countFailures = Counter.builder("kkv.target.update.failure")
.description("Failures to confirm update of a target endpoint, after retries")
.register(registry);
countSuccess = Counter.builder("kkv.target.update.ok")
.description("Confirm updates of a target endpoint, after retries")
.register(registry);
countFailures.increment(0);
countSuccess.increment(0);
}

@Inject
public UpdatesDispatcherWebclient(Vertx vertx, MeterRegistry registry, EndpointsWatcher watcher) {
this.webClient = WebClient.create(vertx);
this.registry = registry;
this.watcher = watcher;

initMetrics(registry);
Expand Down Expand Up @@ -76,6 +85,7 @@ private void dispatch(UpdatesBodyPerTopic body, Map<String, String> targets) {
String name = entry.getValue();

dispatch(json, headers, ip, config.targetServicePort()).subscribe().with(item -> {
countSuccess.increment();
logger.info("Successfully sent update to {}", name);
}, getDispatchFailureConsumer(name));
});
Expand All @@ -91,7 +101,7 @@ private Uni<?> dispatch(JsonObject json, Map<String, String> headers, String hos

private Consumer<Throwable> getDispatchFailureConsumer(String name) {
return (t) -> {
registry.counter("kkv.target.update.failure").increment();
countFailures.increment();
logger.error("Failed to send update to " + name, t);
};
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ kkv:
service:
name: ${TARGET_SERVICE_NAME:}
port: ${TARGET_SERVICE_PORT:8080}
namespace: ${TARGET_SERVICE_NAMESPACE:}
informer-resync-period: ${INFORMER_RESYNC_PERIOD:5m}
static:
host: ${TARGET_STATIC_HOST:}
port: ${TARGET_STATIC_PORT:8080}
Expand Down Expand Up @@ -50,6 +52,12 @@ quarkus:
level: DEBUG
"org.apache.kafka.clients.Metadata":
level: DEBUG
"io.fabric8.kubernetes.client":
level: DEBUG
min-level: DEBUG
"io.vertx.core.http":
level: DEBUG
min-level: DEBUG

kafka:
snappy:
Expand Down
Loading

0 comments on commit 44affa5

Please sign in to comment.