Skip to content

Commit

Permalink
Cache DiscoveryVariables calls
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziopandini authored and k8s-infra-cherrypick-robot committed Dec 19, 2024
1 parent b04c79c commit 284c3a5
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 22 deletions.
24 changes: 23 additions & 1 deletion internal/controllers/clusterclass/clusterclass_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import (
runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
"sigs.k8s.io/cluster-api/feature"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry"
"sigs.k8s.io/cluster-api/internal/topology/variables"
"sigs.k8s.io/cluster-api/internal/util/cache"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conversion"
"sigs.k8s.io/cluster-api/util/patch"
Expand All @@ -67,6 +69,10 @@ type Reconciler struct {

// RuntimeClient is a client for calling runtime extensions.
RuntimeClient runtimeclient.Client

// discoverVariablesCache is used to temporarily store the response of a DiscoveryVariables call for
// a specific runtime extension/settings combination.
discoverVariablesCache cache.Cache[runtimeclient.CallExtensionCacheEntry]
}

func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -91,6 +97,8 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}

r.discoverVariablesCache = cache.New[runtimeclient.CallExtensionCacheEntry]()
return nil
}

Expand Down Expand Up @@ -302,8 +310,13 @@ func (r *Reconciler) reconcileVariables(ctx context.Context, s *scope) (ctrl.Res
req := &runtimehooksv1.DiscoverVariablesRequest{}
req.Settings = patch.External.Settings

// We temporarily cache the response of a DiscoveryVariables call to improve performance in case there are
// many ClusterClasses using the same runtime extension/settings combination.
// This also mitigates spikes when ClusterClass re-syncs happen or when changes to the ExtensionConfig are applied.
// DiscoverVariables is expected to return a "static" response and usually there are few ExtensionConfigs in a mgmt cluster.
resp := &runtimehooksv1.DiscoverVariablesResponse{}
err := r.RuntimeClient.CallExtension(ctx, runtimehooksv1.DiscoverVariables, clusterClass, *patch.External.DiscoverVariablesExtension, req, resp)
err := r.RuntimeClient.CallExtension(ctx, runtimehooksv1.DiscoverVariables, clusterClass, *patch.External.DiscoverVariablesExtension, req, resp,
runtimeclient.WithCaching{Cache: r.discoverVariablesCache, CacheKeyFunc: cacheKeyFunc})
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to call DiscoverVariables for patch %s", patch.Name))
continue
Expand Down Expand Up @@ -492,3 +505,12 @@ func matchNamespace(ctx context.Context, c client.Client, selector labels.Select
}
return selector.Matches(labels.Set(ns.GetLabels()))
}

func cacheKeyFunc(registration *runtimeregistry.ExtensionRegistration, request runtimehooksv1.RequestObject) string {
// Note: registration.Name is identical to the value of the patch.External.DiscoverVariablesExtension field in the ClusterClass.
s := fmt.Sprintf("%s-%s", registration.Name, registration.ExtensionConfigResourceVersion)
for k, v := range request.GetSettings() {
s += fmt.Sprintf(",%s=%s", k, v)
}
return s
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ import (
runtimecatalog "sigs.k8s.io/cluster-api/exp/runtime/catalog"
runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
"sigs.k8s.io/cluster-api/feature"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake"
"sigs.k8s.io/cluster-api/internal/util/cache"
"sigs.k8s.io/cluster-api/util/test/builder"
)

Expand Down Expand Up @@ -1154,7 +1156,8 @@ func TestReconciler_reconcileVariables(t *testing.T) {
Build()

r := &Reconciler{
RuntimeClient: fakeRuntimeClient,
RuntimeClient: fakeRuntimeClient,
discoverVariablesCache: cache.New[runtimeclient.CallExtensionCacheEntry](),
}

// Pin the compatibility version used in variable CEL validation to 1.29, so we don't have to continuously refactor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (f *fakeRuntimeClient) CallAllExtensions(_ context.Context, _ runtimecatalo
panic("implement me")
}

func (f *fakeRuntimeClient) CallExtension(_ context.Context, _ runtimecatalog.Hook, _ metav1.Object, _ string, request runtimehooksv1.RequestObject, _ runtimehooksv1.ResponseObject) error {
func (f *fakeRuntimeClient) CallExtension(_ context.Context, _ runtimecatalog.Hook, _ metav1.Object, _ string, request runtimehooksv1.RequestObject, _ runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error {
// Keep a copy of the request object.
// We keep a copy because the request is modified after the call is made. So we keep a copy to perform assertions.
f.callExtensionRequest = request.DeepCopyObject().(runtimehooksv1.RequestObject)
Expand Down
78 changes: 74 additions & 4 deletions internal/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/http"
"net/url"
"path"
"reflect"
"strconv"
"strings"
"time"
Expand All @@ -49,6 +50,7 @@ import (
runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
runtimemetrics "sigs.k8s.io/cluster-api/internal/runtime/metrics"
runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry"
"sigs.k8s.io/cluster-api/internal/util/cache"
"sigs.k8s.io/cluster-api/util"
)

Expand Down Expand Up @@ -96,7 +98,7 @@ type Client interface {
CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error

// CallExtension calls the ExtensionHandler with the given name.
CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error
CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, opts ...CallExtensionOption) error
}

var _ Client = &client{}
Expand Down Expand Up @@ -276,6 +278,44 @@ func aggregateSuccessfulResponses(aggregatedResponse runtimehooksv1.ResponseObje
aggregatedResponse.SetMessage(strings.Join(messages, ", "))
}

// CallExtensionOption is the interface for configuration that modifies CallExtensionOptions for a CallExtension call.
type CallExtensionOption interface {
// ApplyToOptions applies this configuration to the given CallExtensionOptions.
ApplyToOptions(*CallExtensionOptions)
}

// CallExtensionCacheEntry is a cache entry for the cache that can be used with the CallExtension call via
// the WithCaching option.
type CallExtensionCacheEntry struct {
CacheKey string
Response runtimehooksv1.ResponseObject
}

// Key returns the cache key of a CallExtensionCacheEntry.
func (c CallExtensionCacheEntry) Key() string {
return c.CacheKey
}

// WithCaching enables caching for the CallExtension call.
type WithCaching struct {
Cache cache.Cache[CallExtensionCacheEntry]
CacheKeyFunc func(*runtimeregistry.ExtensionRegistration, runtimehooksv1.RequestObject) string
}

// ApplyToOptions applies WithCaching to the given CallExtensionOptions.
func (w WithCaching) ApplyToOptions(in *CallExtensionOptions) {
in.WithCaching = true
in.Cache = w.Cache
in.CacheKeyFunc = w.CacheKeyFunc
}

// CallExtensionOptions contains the options for the CallExtension call.
type CallExtensionOptions struct {
WithCaching bool
Cache cache.Cache[CallExtensionCacheEntry]
CacheKeyFunc func(*runtimeregistry.ExtensionRegistration, runtimehooksv1.RequestObject) string
}

// CallExtension makes the call to the extension with the given name.
// The response object passed will be updated with the response of the call.
// An error is returned if the extension is not compatible with the hook.
Expand All @@ -288,7 +328,13 @@ func aggregateSuccessfulResponses(aggregatedResponse runtimehooksv1.ResponseObje
// Nb. FailurePolicy does not affect the following kinds of errors:
// - Internal errors. Examples: hooks is incompatible with ExtensionHandler, ExtensionHandler information is missing.
// - Error when ExtensionHandler returns a response with `Status` set to `Failure`.
func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error {
func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, opts ...CallExtensionOption) error {
// Calculate the options.
options := &CallExtensionOptions{}
for _, opt := range opts {
opt.ApplyToOptions(options)
}

log := ctrl.LoggerFrom(ctx).WithValues("extensionHandler", name, "hook", runtimecatalog.HookName(hook))
ctx = ctrl.LoggerInto(ctx, log)
hookGVH, err := c.catalog.GroupVersionHook(hook)
Expand Down Expand Up @@ -331,15 +377,31 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo
// Prepare the request by merging the settings in the registration with the settings in the request.
request = cloneAndAddSettings(request, registration.Settings)

opts := &httpCallOptions{
var cacheKey string
if options.WithCaching {
// Return a cached response if response is cached.
cacheKey = options.CacheKeyFunc(registration, request)
if cacheEntry, ok := options.Cache.Has(cacheKey); ok {
// Set response to cacheEntry.Response.
outVal := reflect.ValueOf(response)
cacheVal := reflect.ValueOf(cacheEntry.Response)
if !cacheVal.Type().AssignableTo(outVal.Type()) {
return fmt.Errorf("failed to call extension handler %q: cached response of type %s instead of type %s", name, cacheVal.Type(), outVal.Type())
}
reflect.Indirect(outVal).Set(reflect.Indirect(cacheVal))
return nil
}
}

httpOpts := &httpCallOptions{
catalog: c.catalog,
config: registration.ClientConfig,
registrationGVH: registration.GroupVersionHook,
hookGVH: hookGVH,
name: strings.TrimSuffix(registration.Name, "."+registration.ExtensionConfigName),
timeout: timeoutDuration,
}
err = httpCall(ctx, request, response, opts)
err = httpCall(ctx, request, response, httpOpts)
if err != nil {
// If the error is errCallingExtensionHandler then apply failure policy to calculate
// the effective result of the operation.
Expand Down Expand Up @@ -368,6 +430,14 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo
log.V(4).Info("Extension handler returned success response")
}

if options.WithCaching {
// Add response to the cache.
options.Cache.Add(CallExtensionCacheEntry{
CacheKey: cacheKey,
Response: response,
})
}

// Received a successful response from the extension handler. The `response` object
// has been populated with the result. Return no error.
return nil
Expand Down
Loading

0 comments on commit 284c3a5

Please sign in to comment.