diff --git a/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java b/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java index 87d47d72..d63216ac 100644 --- a/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java +++ b/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java @@ -42,11 +42,22 @@ public CachingServiceDiscovery(String refreshPeriod) { } this.lastResults = Collections.emptyList(); - this.instances = Uni.createFrom().deferred(() -> fetchNewServiceInstances(this.lastResults) + Uni> retrieval = Uni.createFrom().deferred(() -> fetchNewServiceInstances(this.lastResults) .invoke(l -> this.lastResults = l) .onFailure().invoke(this::handleFetchError) - .onFailure().recoverWithItem(this.lastResults)) - .memoize().atLeast(this.refreshPeriod); + .onFailure().recoverWithItem(this.lastResults)); + this.instances = cache(retrieval); + } + + /*** + * Configures the period to keep service instances in the cache. Elements will be refetched after the given period. + * This method can be extended by the provider in order to change the logic for caching service instances. + * + * @param uni service instances retrieved from backing discovery source + * @return cached list of service instances in form of Uni + */ + public Uni> cache(Uni> uni) { + return uni.memoize().atLeast(this.refreshPeriod); } /** diff --git a/docs/docs/service-discovery/custom-service-discovery.md b/docs/docs/service-discovery/custom-service-discovery.md index bfa87664..220372c8 100644 --- a/docs/docs/service-discovery/custom-service-discovery.md +++ b/docs/docs/service-discovery/custom-service-discovery.md @@ -104,7 +104,7 @@ Remember that attributes, like `host`, are declared using the `@ServiceDiscovery ## Caching the service instances -Your `ServiceDiscovery` implementation can extend `io.smallrye.stork.impl.CachingServiceDiscovery` to automatically _cache_ the service instance. +Your `ServiceDiscovery` implementation can extend `io.smallrye.stork.impl.CachingServiceDiscovery` to automatically _cache_ the service instances. In this case, the retrieved set of `ServiceInstance` is cached and only updated after some time. This duration is an additional configuration attribute. For homogeneity, we recommend the following attribute: @@ -134,3 +134,19 @@ Extending `io.smallrye.stork.impl.CachingServiceDiscovery` changes the structure If the retrieval fails, the error is reported, and Stork keeps the previously retrieved list of instances. +### Customizing the caching strategy + +Sometimes it can be useful to change this behaviour and customize the cache expiration strategy. + +For example, imagine you are using a backend service discovery where service instances can change very frequently. + +Moreover, contacting the backend service discovery can be expensive in terms of computing, +thus finding a good value for the refreshing time can be mission impossible. + +For these situations, Stork allows to implement a better expiration strategy for the cache. + +If you want to customize the expiration strategy, you need: +1. Implement the `cache` method where the expiration strategy should be defined. +2. Invalidate the cache when expiration condition evaluates to true. + +Take a look to the [Kubernetes Service Discovery](kubernetes.md#Caching the service instances) for further details about this feature. diff --git a/docs/docs/service-discovery/kubernetes.md b/docs/docs/service-discovery/kubernetes.md index a1527489..46c05c0c 100644 --- a/docs/docs/service-discovery/kubernetes.md +++ b/docs/docs/service-discovery/kubernetes.md @@ -95,4 +95,20 @@ Then, it can select the instance. Supported attributes are the following: ---8<-- "service-discovery/kubernetes/target/classes/META-INF/stork-docs/kubernetes-sd-attributes.txt" \ No newline at end of file +--8<-- "service-discovery/kubernetes/target/classes/META-INF/stork-docs/kubernetes-sd-attributes.txt" + + +## Caching the service instances + +Contacting the cluster too much frequently can result in performance problems. It's why Kubernetes Service discovery extends `io.smallrye.stork.impl.CachingServiceDiscovery` to automatically _cache_ the service instances. +Moreover, the caching expiration has been also improved in order to only update the retrieved set of `ServiceInstance` if some of them changes and an event is emitted. +This is done by creating an [Informer](https://www.javadoc.io/static/io.fabric8/kubernetes-client-api/6.1.1/io/fabric8/kubernetes/client/informers/SharedIndexInformer.html), similar to a [Watch](https://www.javadoc.io/static/io.fabric8/kubernetes-client-api/6.1.1/io/fabric8/kubernetes/client/Watch.html), able to observe the events on the service instances resources. + +--8<-- "src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java" + +Note that: + - the cache is invalidated when an event is received. + - the cache is validated once the instances are retrieved from the cluster, in the `fetchNewServiceInstances` method. + - the `cache` method is overrided to customize the expiration strategy. In this case the collection of service instances will be kept until an event occurs. + + diff --git a/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java b/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java index c7aa619e..b2bb875b 100644 --- a/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java +++ b/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -22,8 +23,9 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.smallrye.mutiny.Uni; import io.smallrye.stork.api.Metadata; import io.smallrye.stork.api.ServiceInstance; @@ -50,6 +52,8 @@ public class KubernetesServiceDiscovery extends CachingServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesServiceDiscovery.class); + private AtomicBoolean invalidated = new AtomicBoolean(); + /** * Creates a new KubernetesServiceDiscovery. * @@ -74,9 +78,39 @@ public KubernetesServiceDiscovery(String serviceName, KubernetesConfiguration co Config k8sConfig = new ConfigBuilder(base) .withMasterUrl(masterUrl) .withNamespace(namespace).build(); - this.client = new DefaultKubernetesClient(k8sConfig); + this.client = new KubernetesClientBuilder().withConfig(k8sConfig).build(); this.vertx = vertx; this.secure = isSecure(config); + client.endpoints().inform(new ResourceEventHandler() { + @Override + public void onAdd(Endpoints obj) { + LOGGER.info("Endpoint added: {}", obj.getMetadata().getName()); + invalidate(); + } + + @Override + public void onUpdate(Endpoints oldObj, Endpoints newObj) { + LOGGER.info("Endpoint updated : {}", newObj.getMetadata().getName()); + invalidate(); + } + + @Override + public void onDelete(Endpoints obj, boolean deletedFinalStateUnknown) { + LOGGER.info("Endpoint deleted: {}", obj.getMetadata().getName()); + invalidate(); + } + + }); + + } + + @Override + public Uni> cache(Uni> uni) { + return uni.memoize().until(() -> invalidated.get()); + } + + public void invalidate() { + invalidated.set(true); } @Override @@ -129,7 +163,8 @@ public Uni> fetchNewServiceInstances(List } }); }); - return endpointsUni.onItem().transform(endpoints -> toStorkServiceInstances(endpoints, previousInstances)); + return endpointsUni.onItem().transform(endpoints -> toStorkServiceInstances(endpoints, previousInstances)) + .invoke(() -> invalidated.set(false)); } private List toStorkServiceInstances(Map> backend, diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java index b76e9f70..a2d339a2 100644 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java +++ b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java @@ -6,6 +6,7 @@ import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; +import java.net.HttpURLConnection; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; @@ -15,7 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.awaitility.core.ConditionTimeoutException; import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,6 +34,8 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import io.smallrye.common.constraint.Assert; import io.smallrye.stork.Stork; import io.smallrye.stork.api.Metadata; import io.smallrye.stork.api.Service; @@ -44,6 +49,8 @@ public class KubernetesServiceDiscoveryTest { KubernetesClient client; + KubernetesMockServer server; + String k8sMasterUrl; String defaultNamespace; @@ -64,7 +71,7 @@ void shouldGetServiceFromK8sDefaultNamespace() { String serviceName = "svc"; String[] ips = { "10.96.96.231", "10.96.96.232", "10.96.96.233" }; - registerKubernetesService(serviceName, defaultNamespace, ips); + registerKubernetesResources(serviceName, defaultNamespace, ips); AtomicReference> instances = new AtomicReference<>(); @@ -102,7 +109,7 @@ void shouldGetServiceFromK8sDefaultNamespaceUsingProgrammaticAPI() { String serviceName = "svc"; String[] ips = { "10.96.96.231", "10.96.96.232", "10.96.96.233" }; - registerKubernetesService(serviceName, defaultNamespace, ips); + registerKubernetesResources(serviceName, defaultNamespace, ips); AtomicReference> instances = new AtomicReference<>(); @@ -141,7 +148,7 @@ void shouldHandleSecureAttribute() { String serviceName = "svc"; String[] ips = { "10.96.96.231", "10.96.96.232", "10.96.96.233" }; - registerKubernetesService(serviceName, defaultNamespace, ips); + registerKubernetesResources(serviceName, defaultNamespace, ips); AtomicReference> instances = new AtomicReference<>(); @@ -179,8 +186,9 @@ void shouldDiscoverServiceWithSpecificName() { String serviceName = "svc"; - registerKubernetesService("rest-service", defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); - registerKubernetesService("svc", defaultNamespace, "10.95.95.125"); + registerKubernetesResources("rest-service", defaultNamespace, "10.96.96.231", "10.96.96.232", + "10.96.96.233"); + registerKubernetesResources("svc", defaultNamespace, "10.95.95.125"); AtomicReference> instances = new AtomicReference<>(); @@ -198,22 +206,23 @@ void shouldDiscoverServiceWithSpecificName() { "10.96.96.232", "10.96.96.233"); for (ServiceInstance serviceInstance : instances.get()) { Map labels = serviceInstance.getLabels(); - assertThat(labels).contains(entry("app.kubernetes.io/name", "svc"), + assertThat(labels).contains(entry("app.kubernetes.io/name", "rest-service"), entry("app.kubernetes.io/version", "1.0"), entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost()))); } } @Test - void shouldGetServiceFromK8sNamespace() { + void shouldGetServiceFromSpecificNamespace() { TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "ns1")); Stork stork = StorkTestUtils.getNewStorkInstance(); String serviceName = "svc"; + String specificNs = "ns1"; - registerKubernetesService(serviceName, "ns1", "10.96.96.231", "10.96.96.232", "10.96.96.233"); + registerKubernetesResources(serviceName, specificNs, "10.96.96.231", "10.96.96.232", "10.96.96.233"); AtomicReference> instances = new AtomicReference<>(); @@ -245,15 +254,15 @@ void shouldGetServiceFromK8sNamespace() { } @Test - void shouldGetServiceFromK8sAllNamespace() { + void shouldGetServiceFromAllNamespace() { TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "all")); Stork stork = StorkTestUtils.getNewStorkInstance(); String serviceName = "svc"; - registerKubernetesService(serviceName, "ns1", "10.96.96.231", "10.96.96.232", "10.96.96.233"); - registerKubernetesService(serviceName, "ns2", "10.99.99.241", "10.99.99.242", "10.99.99.243"); + registerKubernetesResources(serviceName, "ns1", "10.96.96.231", "10.96.96.232", "10.96.96.233"); + registerKubernetesResources(serviceName, "ns2", "10.99.99.241", "10.99.99.242", "10.99.99.243"); AtomicReference> instances = new AtomicReference<>(); @@ -285,19 +294,15 @@ void shouldGetServiceFromK8sAllNamespace() { } @Test - void shouldNotFetchWhenRefreshPeriodNotReached() { - //Given a service `my-service` registered in k8s and a refresh-period of 5 minutes - // 1- services instance are gathered form k8s - // 2- we remove the service - // when the k8s service discovery is called before the end of refreshing period - // Then stork returns the instances from the cache + void shouldPreserveIdsOnRefetch() throws InterruptedException { + TestConfigProvider.addServiceConfig("svc", null, "kubernetes", - null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace)); + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); Stork stork = StorkTestUtils.getNewStorkInstance(); String serviceName = "svc"; - registerKubernetesService(serviceName, null, "10.96.96.231", "10.96.96.232", "10.96.96.233"); + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); AtomicReference> instances = new AtomicReference<>(); @@ -314,30 +319,60 @@ void shouldNotFetchWhenRefreshPeriodNotReached() { assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", "10.96.96.232", "10.96.96.233"); + Map idsByHostname = mapHostnameToIds(instances.get()); + client.endpoints().withName(serviceName).delete(); + client.pods().withName("svc-109696231").delete(); + client.pods().withName("svc-109696232").delete(); + client.pods().withName("svc-109696233").delete(); + + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232"); + + Thread.sleep(5000); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) .subscribe().with(instances::set); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); + .until(instances::get, Matchers.hasSize(2)); - assertThat(instances.get()).hasSize(3); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", - "10.96.96.232", "10.96.96.233"); for (ServiceInstance serviceInstance : instances.get()) { - Map labels = serviceInstance.getLabels(); - assertThat(labels).contains(entry("app.kubernetes.io/name", "svc"), - entry("app.kubernetes.io/version", "1.0"), - entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost()))); + assertThat(idsByHostname.get(serviceInstance.getHost())).isEqualTo(serviceInstance.getId()); } + client.endpoints().withName(serviceName).delete(); + client.pods().withName("svc-109696231").delete(); + client.pods().withName("svc-109696232").delete(); + + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.234"); + + Thread.sleep(5000); + + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(instances::get, Matchers.hasSize(3)); + + for (ServiceInstance serviceInstance : instances.get()) { + if (serviceInstance.getHost().equals("10.96.96.234")) { + assertThat(idsByHostname.containsValue(serviceInstance.getId())).isFalse(); + } else { + assertThat(idsByHostname.get(serviceInstance.getHost())).isEqualTo(serviceInstance.getId()); + } + } } @Test - void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException { + void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws InterruptedException { + + // Given a service with 3 instances registered in the cluster + // Stork gather the cache from the cluster + // When the endpoints are removed (this invalidates the cache) + // Stork is called to get service instances again + // Stork contacts the cluster to get the instances : it gets 0 of them TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); @@ -345,7 +380,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException { String serviceName = "svc"; - registerKubernetesService(serviceName, null, "10.96.96.231", "10.96.96.232", "10.96.96.233"); + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); AtomicReference> instances = new AtomicReference<>(); @@ -377,7 +412,13 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException { } @Test - void shouldPreserveIdsOnRefetch() throws InterruptedException { + void shouldFetchInstancesFromTheCache() throws InterruptedException { + + // Given an endpoint registered in the cluster + // Stork gather the cache from the cluster + // When an expectation is configured to throw an Error the next time we contact the cluster to get the endpoints and + // Stork is called to get service instances + // Stork get the instances from the cache: the error is not thrown because the cluster is not contacted. TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); @@ -385,7 +426,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { String serviceName = "svc"; - registerKubernetesService(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231"); AtomicReference> instances = new AtomicReference<>(); @@ -397,53 +438,77 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { await().atMost(Duration.ofSeconds(5)) .until(() -> instances.get() != null); - assertThat(instances.get()).hasSize(3); + assertThat(instances.get()).hasSize(1); assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", - "10.96.96.232", "10.96.96.233"); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); - Map idsByHostname = mapHostnameToIds(instances.get()); + server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") + .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "{}").once(); - client.endpoints().withName(serviceName).delete(); - client.pods().withName("svc-109696231").delete(); - client.pods().withName("svc-109696232").delete(); - client.pods().withName("svc-109696233").delete(); - registerKubernetesService(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232"); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); - Thread.sleep(5000); + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(instances.get()).hasSize(1); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); + } + + @Test + void shouldGetInstancesFromTheCluster() throws InterruptedException { + + // Given an endpoint registered in the cluster + // Stork gather the cache from the cluster + // When an expectation in the cluster is configured to throw an Error the next time we try to get the endpoints and + // When the endpoint is removed (this invalidates the cache) + // Stork is called to get service instances again + // Stork gets the instances from the cache: the error is not thrown because the cluster is not contacted. + TestConfigProvider.addServiceConfig("svc", null, "kubernetes", + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); + Stork stork = StorkTestUtils.getNewStorkInstance(); + + String serviceName = "svc"; + + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231"); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) .subscribe().with(instances::set); await().atMost(Duration.ofSeconds(5)) - .until(instances::get, Matchers.hasSize(2)); + .until(() -> instances.get() != null); - for (ServiceInstance serviceInstance : instances.get()) { - assertThat(idsByHostname.get(serviceInstance.getHost())).isEqualTo(serviceInstance.getId()); - } + assertThat(instances.get()).hasSize(1); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); - client.endpoints().withName(serviceName).delete(); - client.pods().withName("svc-109696231").delete(); - client.pods().withName("svc-109696232").delete(); - registerKubernetesService(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.234"); + server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") + .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "{}").once(); - Thread.sleep(5000); + client.endpoints().withName(serviceName).delete(); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) .subscribe().with(instances::set); - await().atMost(Duration.ofSeconds(5)) - .until(instances::get, Matchers.hasSize(3)); + Assertions.assertThrows(ConditionTimeoutException.class, + () -> await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instances.get().isEmpty())); - for (ServiceInstance serviceInstance : instances.get()) { - if (serviceInstance.getHost().equals("10.96.96.234")) { - assertThat(idsByHostname.containsValue(serviceInstance.getId())).isFalse(); - } else { - assertThat(idsByHostname.get(serviceInstance.getHost())).isEqualTo(serviceInstance.getId()); - } - } + } + + private void registerKubernetesResources(String serviceName, String namespace, String... ips) { + Assert.checkNotNullParam("ips", ips); + buildAndRegisterKubernetesService(serviceName, namespace, true, ips); + Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip)).collect(Collectors.toList()); } private Map mapHostnameToIds(List serviceInstances) { @@ -454,15 +519,13 @@ private Map mapHostnameToIds(List serviceInstance return result; } - private void registerKubernetesService(String serviceName, String namespace, + private Endpoints buildAndRegisterKubernetesService(String serviceName, String namespace, boolean register, String... ipAdresses) { Map serviceLabels = new HashMap<>(); - serviceLabels.put("app.kubernetes.io/name", "svc"); + serviceLabels.put("app.kubernetes.io/name", serviceName); serviceLabels.put("app.kubernetes.io/version", "1.0"); - registerBackendPods(serviceName, namespace, serviceLabels, ipAdresses); - List endpointAddresses = Arrays.stream(ipAdresses) .map(ipAddress -> { ObjectReference targetRef = new ObjectReference(null, null, "Pod", @@ -478,30 +541,38 @@ private void registerKubernetesService(String serviceName, String namespace, .build()) .build(); - if (namespace != null) { - client.endpoints().inNamespace(namespace).resource(endpoint).create(); - } else { - client.endpoints().resource(endpoint).create(); + if (register) { + if (namespace != null) { + client.endpoints().inNamespace(namespace).resource(endpoint).create(); + } else { + client.endpoints().resource(endpoint).create(); + } } + return endpoint; } - private void registerBackendPods(String name, String namespace, Map labels, String[] ipAdresses) { + private Pod buildAndRegisterBackendPod(String name, String namespace, boolean register, String ip) { + + Map serviceLabels = new HashMap<>(); + serviceLabels.put("app.kubernetes.io/name", name); + serviceLabels.put("app.kubernetes.io/version", "1.0"); - for (String ip : ipAdresses) { - Map podLabels = new HashMap<>(labels); - podLabels.put("ui", "ui-" + ipAsSuffix(ip)); - Pod backendPod = new PodBuilder().withNewMetadata().withName(name + "-" + ipAsSuffix(ip)) - .withLabels(podLabels) - .endMetadata() - .build(); + Map podLabels = new HashMap<>(serviceLabels); + podLabels.put("ui", "ui-" + ipAsSuffix(ip)); + Pod backendPod = new PodBuilder().withNewMetadata().withName(name + "-" + ipAsSuffix(ip)) + .withLabels(podLabels) + .withNamespace(namespace) + .endMetadata() + .build(); + if (register) { if (namespace != null) { client.pods().inNamespace(namespace).resource(backendPod).create(); } else { client.pods().resource(backendPod).create(); } - } + return backendPod; } private String ipAsSuffix(String ipAddress) {