Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve cache invalidation for k8s service discovery #374

Merged
merged 3 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,22 @@ public CachingServiceDiscovery(String refreshPeriod) {
}

this.lastResults = Collections.emptyList();
this.instances = Uni.createFrom().deferred(() -> fetchNewServiceInstances(this.lastResults)
Uni<List<ServiceInstance>> retrieval = Uni.createFrom().deferred(() -> fetchNewServiceInstances(this.lastResults)
.invoke(l -> this.lastResults = l)
.onFailure().invoke(this::handleFetchError)
.onFailure().recoverWithItem(this.lastResults))
.memoize().atLeast(this.refreshPeriod);
.onFailure().recoverWithItem(this.lastResults));
this.instances = cache(retrieval);
}

/***
* Configures the period to keep service instances in the cache. Elements will be refetched after the given period.
* This method can be extended by the provider in order to change the logic for caching service instances.
*
* @param uni service instances retrieved from backing discovery source
* @return cached list of service instances in form of Uni
*/
public Uni<List<ServiceInstance>> cache(Uni<List<ServiceInstance>> uni) {
return uni.memoize().atLeast(this.refreshPeriod);
}

/**
Expand Down
18 changes: 17 additions & 1 deletion docs/docs/service-discovery/custom-service-discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Remember that attributes, like `host`, are declared using the `@ServiceDiscovery

## Caching the service instances

Your `ServiceDiscovery` implementation can extend `io.smallrye.stork.impl.CachingServiceDiscovery` to automatically _cache_ the service instance.
Your `ServiceDiscovery` implementation can extend `io.smallrye.stork.impl.CachingServiceDiscovery` to automatically _cache_ the service instances.
In this case, the retrieved set of `ServiceInstance` is cached and only updated after some time.
This duration is an additional configuration attribute.
For homogeneity, we recommend the following attribute:
Expand Down Expand Up @@ -134,3 +134,19 @@ Extending `io.smallrye.stork.impl.CachingServiceDiscovery` changes the structure
If the retrieval fails, the error is reported, and Stork keeps the previously retrieved list of instances.


### Customizing the caching strategy

Sometimes it can be useful to change this behaviour and customize the cache expiration strategy.

For example, imagine you are using a backend service discovery where service instances can change very frequently.

Moreover, contacting the backend service discovery can be expensive in terms of computing,
thus finding a good value for the refreshing time can be mission impossible.

For these situations, Stork allows to implement a better expiration strategy for the cache.

If you want to customize the expiration strategy, you need:
1. Implement the `cache` method where the expiration strategy should be defined.
2. Invalidate the cache when expiration condition evaluates to true.

Take a look to the [Kubernetes Service Discovery](kubernetes.md#Caching the service instances) for further details about this feature.
18 changes: 17 additions & 1 deletion docs/docs/service-discovery/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,20 @@ Then, it can select the instance.

Supported attributes are the following:

--8<-- "service-discovery/kubernetes/target/classes/META-INF/stork-docs/kubernetes-sd-attributes.txt"
--8<-- "service-discovery/kubernetes/target/classes/META-INF/stork-docs/kubernetes-sd-attributes.txt"


## Caching the service instances

Contacting the cluster too much frequently can result in performance problems. It's why Kubernetes Service discovery extends `io.smallrye.stork.impl.CachingServiceDiscovery` to automatically _cache_ the service instances.
Moreover, the caching expiration has been also improved in order to only update the retrieved set of `ServiceInstance` if some of them changes and an event is emitted.
This is done by creating an [Informer](https://www.javadoc.io/static/io.fabric8/kubernetes-client-api/6.1.1/io/fabric8/kubernetes/client/informers/SharedIndexInformer.html), similar to a [Watch](https://www.javadoc.io/static/io.fabric8/kubernetes-client-api/6.1.1/io/fabric8/kubernetes/client/Watch.html), able to observe the events on the service instances resources.

--8<-- "src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java"

Note that:
- the cache is invalidated when an event is received.
- the cache is validated once the instances are retrieved from the cluster, in the `fetchNewServiceInstances` method.
- the `cache` method is overrided to customize the expiration strategy. In this case the collection of service instances will be kept until an event occurs.


Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -22,8 +23,9 @@
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.Metadata;
import io.smallrye.stork.api.ServiceInstance;
Expand All @@ -50,6 +52,8 @@ public class KubernetesServiceDiscovery extends CachingServiceDiscovery {

private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesServiceDiscovery.class);

private AtomicBoolean invalidated = new AtomicBoolean();

/**
* Creates a new KubernetesServiceDiscovery.
*
Expand All @@ -74,9 +78,39 @@ public KubernetesServiceDiscovery(String serviceName, KubernetesConfiguration co
Config k8sConfig = new ConfigBuilder(base)
.withMasterUrl(masterUrl)
.withNamespace(namespace).build();
this.client = new DefaultKubernetesClient(k8sConfig);
this.client = new KubernetesClientBuilder().withConfig(k8sConfig).build();
this.vertx = vertx;
this.secure = isSecure(config);
client.endpoints().inform(new ResourceEventHandler<Endpoints>() {
@Override
public void onAdd(Endpoints obj) {
LOGGER.info("Endpoint added: {}", obj.getMetadata().getName());
invalidate();
}

@Override
public void onUpdate(Endpoints oldObj, Endpoints newObj) {
LOGGER.info("Endpoint updated : {}", newObj.getMetadata().getName());
invalidate();
}

@Override
public void onDelete(Endpoints obj, boolean deletedFinalStateUnknown) {
LOGGER.info("Endpoint deleted: {}", obj.getMetadata().getName());
invalidate();
}

});

}

@Override
public Uni<List<ServiceInstance>> cache(Uni<List<ServiceInstance>> uni) {
return uni.memoize().until(() -> invalidated.get());
}

public void invalidate() {
invalidated.set(true);
}

@Override
Expand Down Expand Up @@ -129,7 +163,8 @@ public Uni<List<ServiceInstance>> fetchNewServiceInstances(List<ServiceInstance>
}
});
});
return endpointsUni.onItem().transform(endpoints -> toStorkServiceInstances(endpoints, previousInstances));
return endpointsUni.onItem().transform(endpoints -> toStorkServiceInstances(endpoints, previousInstances))
.invoke(() -> invalidated.set(false));
}

private List<ServiceInstance> toStorkServiceInstances(Map<Endpoints, List<Pod>> backend,
Expand Down
Loading