diff --git a/cmd/sharder/app/app.go b/cmd/sharder/app/app.go index d766091f..02f20776 100644 --- a/cmd/sharder/app/app.go +++ b/cmd/sharder/app/app.go @@ -36,7 +36,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics" "github.com/timebertt/kubernetes-controller-sharding/pkg/controller" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring" healthzutils "github.com/timebertt/kubernetes-controller-sharding/pkg/utils/healthz" "github.com/timebertt/kubernetes-controller-sharding/pkg/webhook" ) @@ -114,15 +113,13 @@ func run(ctx context.Context, log logr.Logger, opts *options) error { return err } - ringCache := ring.NewCache() - log.Info("Adding controllers to manager") - if err := controller.AddToManager(ctx, mgr, ringCache, opts.config); err != nil { + if err := controller.AddToManager(ctx, mgr, opts.config); err != nil { return fmt.Errorf("failed adding controllers to manager: %w", err) } log.Info("Adding webhooks to manager") - if err := webhook.AddToManager(ctx, mgr, ringCache, opts.config); err != nil { + if err := webhook.AddToManager(ctx, mgr, opts.config); err != nil { return fmt.Errorf("failed adding webhooks to manager: %w", err) } diff --git a/hack/config/monitoring/shoot/patch_prometheus.yaml b/hack/config/monitoring/shoot/patch_prometheus.yaml index 7aa1fd0f..b92b43bc 100644 --- a/hack/config/monitoring/shoot/patch_prometheus.yaml +++ b/hack/config/monitoring/shoot/patch_prometheus.yaml @@ -4,8 +4,8 @@ metadata: name: k8s namespace: monitoring spec: - retention: 7d - retentionSize: 85GiB + retention: 30d + retentionSize: 90GiB storage: volumeClaimTemplate: metadata: diff --git a/pkg/controller/add.go b/pkg/controller/add.go index 1a471610..473017ff 100644 --- a/pkg/controller/add.go +++ b/pkg/controller/add.go @@ -26,11 +26,10 @@ import ( "github.com/timebertt/kubernetes-controller-sharding/pkg/controller/clusterring" "github.com/timebertt/kubernetes-controller-sharding/pkg/controller/sharder" "github.com/timebertt/kubernetes-controller-sharding/pkg/controller/shardlease" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring" ) // AddToManager adds all controllers to the manager. -func AddToManager(ctx context.Context, mgr manager.Manager, ringCache ring.Cache, config *configv1alpha1.SharderConfig) error { +func AddToManager(ctx context.Context, mgr manager.Manager, config *configv1alpha1.SharderConfig) error { if err := (&clusterring.Reconciler{ Config: config, }).AddToManager(mgr); err != nil { @@ -39,7 +38,6 @@ func AddToManager(ctx context.Context, mgr manager.Manager, ringCache ring.Cache if err := (&sharder.Reconciler{ Config: config, - Cache: ringCache, }).AddToManager(mgr); err != nil { return fmt.Errorf("failed adding sharder controller: %w", err) } diff --git a/pkg/controller/clusterring/add.go b/pkg/controller/clusterring/add.go index 485ef46a..6c4dee3a 100644 --- a/pkg/controller/clusterring/add.go +++ b/pkg/controller/clusterring/add.go @@ -96,7 +96,8 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate { } // only enqueue ring if the shard's state changed - return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock) + now := r.Clock.Now() + return leases.ToState(oldLease, now).IsAvailable() != leases.ToState(newLease, now).IsAvailable() }, DeleteFunc: func(_ event.DeleteEvent) bool { return true }, }, diff --git a/pkg/controller/clusterring/reconciler.go b/pkg/controller/clusterring/reconciler.go index a0b8befd..892c1024 100644 --- a/pkg/controller/clusterring/reconciler.go +++ b/pkg/controller/clusterring/reconciler.go @@ -87,7 +87,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco return reconcile.Result{}, r.updateStatusError(ctx, log, fmt.Errorf("error listing Leases for ClusterRing: %w", err), clusterRing, before) } - shards := leases.ToShards(leaseList.Items, r.Clock) + shards := leases.ToShards(leaseList.Items, r.Clock.Now()) clusterRing.Status.Shards = int32(len(shards)) clusterRing.Status.AvailableShards = int32(len(shards.AvailableShards())) diff --git a/pkg/controller/sharder/add.go b/pkg/controller/sharder/add.go index 49767e0f..34c6ca64 100644 --- a/pkg/controller/sharder/add.go +++ b/pkg/controller/sharder/add.go @@ -92,7 +92,7 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate { // We only need to resync the ring if the new shard is available right away. // Note: on controller start we will enqueue anyway for the add event of ClusterRings. - return leases.ToState(lease, r.Clock).IsAvailable() + return leases.ToState(lease, r.Clock.Now()).IsAvailable() }, UpdateFunc: func(e event.UpdateEvent) bool { oldLease, ok := e.ObjectOld.(*coordinationv1.Lease) @@ -105,7 +105,8 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate { } // We only need to resync the ring if the shard's availability changed. - return leases.ToState(oldLease, r.Clock).IsAvailable() != leases.ToState(newLease, r.Clock).IsAvailable() + now := r.Clock.Now() + return leases.ToState(oldLease, now).IsAvailable() != leases.ToState(newLease, now).IsAvailable() }, DeleteFunc: func(e event.DeleteEvent) bool { if e.DeleteStateUnknown { @@ -123,7 +124,7 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate { } // We only need to resync the ring if the removed shard was still available. - return leases.ToState(lease, r.Clock).IsAvailable() + return leases.ToState(lease, r.Clock.Now()).IsAvailable() }, }, ) diff --git a/pkg/controller/sharder/reconciler.go b/pkg/controller/sharder/reconciler.go index 39c52e88..c009fadd 100644 --- a/pkg/controller/sharder/reconciler.go +++ b/pkg/controller/sharder/reconciler.go @@ -60,8 +60,6 @@ type Reconciler struct { Reader client.Reader Clock clock.PassiveClock Config *configv1alpha1.SharderConfig - - Cache ring.Cache } // Reconcile reconciles a ClusterRing object. @@ -86,7 +84,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco } // get ring and shards from cache - hashRing, shards := r.Cache.Get(clusterRing, leaseList) + hashRing, shards := ring.FromLeases(clusterRing, leaseList, r.Clock.Now()) namespaces, err := r.getSelectedNamespaces(ctx, clusterRing) if err != nil { diff --git a/pkg/controller/shardlease/add.go b/pkg/controller/shardlease/add.go index 15f5824f..d800e9cf 100644 --- a/pkg/controller/shardlease/add.go +++ b/pkg/controller/shardlease/add.go @@ -76,7 +76,8 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate { return false } - return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock) + now := r.Clock.Now() + return leases.ToState(oldLease, now) != leases.ToState(newLease, now) }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, }, diff --git a/pkg/controller/shardlease/reconciler.go b/pkg/controller/shardlease/reconciler.go index 708b0b0c..a0f82812 100644 --- a/pkg/controller/shardlease/reconciler.go +++ b/pkg/controller/shardlease/reconciler.go @@ -71,8 +71,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco } var ( + now = r.Clock.Now() previousState = leases.StateFromString(lease.Labels[shardingv1alpha1.LabelState]) - shard = leases.ToShard(lease, r.Clock) + shard = leases.ToShard(lease, now) ) log = log.WithValues("state", shard.State, "expirationTime", shard.Times.Expiration, "leaseDuration", shard.Times.LeaseDuration) @@ -99,7 +100,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco case leases.Uncertain: log.Info("Shard lease has expired more than leaseDuration ago, trying to acquire shard lease") - now := metav1.NewMicroTime(r.Clock.Now()) + nowMicro := metav1.NewMicroTime(now) transitions := int32(0) if lease.Spec.LeaseTransitions != nil { transitions = *lease.Spec.LeaseTransitions @@ -107,8 +108,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco lease.Spec.HolderIdentity = ptr.To(shardingv1alpha1.IdentityShardLeaseController) lease.Spec.LeaseDurationSeconds = ptr.To(2 * int32(shard.Times.LeaseDuration.Round(time.Second).Seconds())) - lease.Spec.AcquireTime = &now - lease.Spec.RenewTime = &now + lease.Spec.AcquireTime = &nowMicro + lease.Spec.RenewTime = &nowMicro lease.Spec.LeaseTransitions = ptr.To(transitions + 1) if err := r.Client.Update(ctx, lease); err != nil { return reconcile.Result{}, fmt.Errorf("error acquiring shard lease: %w", err) diff --git a/pkg/sharding/consistenthash/ring.go b/pkg/sharding/consistenthash/ring.go index 24433f6e..10624490 100644 --- a/pkg/sharding/consistenthash/ring.go +++ b/pkg/sharding/consistenthash/ring.go @@ -17,17 +17,17 @@ limitations under the License. package consistenthash import ( - "fmt" - "sort" + "slices" + "strconv" "github.com/cespare/xxhash/v2" ) // Hash is a function computing a 64-bit digest. -type Hash func(data []byte) uint64 +type Hash func(data string) uint64 // DefaultHash is the default Hash used by Ring. -var DefaultHash Hash = xxhash.Sum64 +var DefaultHash Hash = xxhash.Sum64String // DefaultTokensPerNode is the default number of virtual nodes per node. const DefaultTokensPerNode = 100 @@ -41,12 +41,13 @@ func New(fn Hash, tokensPerNode int, initialNodes ...string) *Ring { tokensPerNode = DefaultTokensPerNode } + numTokens := len(initialNodes) * tokensPerNode r := &Ring{ hash: fn, tokensPerNode: tokensPerNode, - tokens: make([]uint64, 0, len(initialNodes)*tokensPerNode), - tokenToNode: make(map[uint64]string, len(initialNodes)), + tokens: make([]uint64, 0, numTokens), + tokenToNode: make(map[uint64]string, numTokens), } r.AddNodes(initialNodes...) return r @@ -69,16 +70,14 @@ func (r *Ring) IsEmpty() bool { func (r *Ring) AddNodes(nodes ...string) { for _, node := range nodes { for i := 0; i < r.tokensPerNode; i++ { - t := r.hash([]byte(fmt.Sprintf("%s-%d", node, i))) + t := r.hash(node + strconv.FormatInt(int64(i), 10)) r.tokens = append(r.tokens, t) r.tokenToNode[t] = node } } // sort all tokens on the ring for binary searches - sort.Slice(r.tokens, func(i, j int) bool { - return r.tokens[i] < r.tokens[j] - }) + slices.Sort(r.tokens) } func (r *Ring) Hash(key string) string { @@ -86,15 +85,11 @@ func (r *Ring) Hash(key string) string { return "" } - // Hash key and walk the ring until we find the next virtual node - h := r.hash([]byte(key)) + // Hash key and find the next virtual node on the ring + h := r.hash(key) + i, _ := slices.BinarySearch(r.tokens, h) - // binary search - i := sort.Search(len(r.tokens), func(i int) bool { - return r.tokens[i] >= h - }) - - // walked the whole ring + // walked the whole ring, next virtual node is the first one if i == len(r.tokens) { i = 0 } diff --git a/pkg/sharding/leases/shards.go b/pkg/sharding/leases/shards.go index c76491eb..fd57d0b9 100644 --- a/pkg/sharding/leases/shards.go +++ b/pkg/sharding/leases/shards.go @@ -17,7 +17,7 @@ limitations under the License. package leases import ( - "k8s.io/utils/clock" + "time" coordinationv1 "k8s.io/api/coordination/v1" ) @@ -50,7 +50,7 @@ func (s Shards) ByID(id string) Shard { // AvailableShards returns the subset of available Shards as determined by IsAvailable. func (s Shards) AvailableShards() Shards { - var shards Shards + shards := make(Shards, 0, len(s)) for _, shard := range s { if shard.State.IsAvailable() { shards = append(shards, shard) @@ -71,18 +71,18 @@ func (s Shards) IDs() []string { } // ToShards takes a list of Lease objects and transforms them to a list of Shards. -func ToShards(leases []coordinationv1.Lease, cl clock.PassiveClock) Shards { +func ToShards(leases []coordinationv1.Lease, now time.Time) Shards { shards := make(Shards, 0, len(leases)) for _, lease := range leases { l := lease - shards = append(shards, ToShard(&l, cl)) + shards = append(shards, ToShard(&l, now)) } return shards } // ToShard takes a Lease object and transforms it to a Shard. -func ToShard(lease *coordinationv1.Lease, cl clock.PassiveClock) Shard { - times := ToTimes(lease, cl) +func ToShard(lease *coordinationv1.Lease, now time.Time) Shard { + times := ToTimes(lease, now) return Shard{ ID: lease.GetName(), Times: times, diff --git a/pkg/sharding/leases/state.go b/pkg/sharding/leases/state.go index cf52154f..60f26d74 100644 --- a/pkg/sharding/leases/state.go +++ b/pkg/sharding/leases/state.go @@ -17,7 +17,7 @@ limitations under the License. package leases import ( - "k8s.io/utils/clock" + "time" coordinationv1 "k8s.io/api/coordination/v1" ) @@ -83,8 +83,8 @@ func (s ShardState) IsAvailable() bool { } // ToState returns the ShardState of the given Lease. -func ToState(lease *coordinationv1.Lease, cl clock.PassiveClock) ShardState { - return toState(lease, ToTimes(lease, cl)) +func ToState(lease *coordinationv1.Lease, now time.Time) ShardState { + return toState(lease, ToTimes(lease, now)) } func toState(lease *coordinationv1.Lease, t Times) ShardState { diff --git a/pkg/sharding/leases/times.go b/pkg/sharding/leases/times.go index 58693468..6390e0ef 100644 --- a/pkg/sharding/leases/times.go +++ b/pkg/sharding/leases/times.go @@ -19,8 +19,6 @@ package leases import ( "time" - "k8s.io/utils/clock" - coordinationv1 "k8s.io/api/coordination/v1" ) @@ -46,10 +44,9 @@ type Times struct { } // ToTimes parses the times and durations in the given Lease object and returns them in the Times representation. -func ToTimes(lease *coordinationv1.Lease, cl clock.PassiveClock) Times { +func ToTimes(lease *coordinationv1.Lease, now time.Time) Times { var ( t = Times{} - now = cl.Now() acquireTime = lease.Spec.AcquireTime renewTime = lease.Spec.RenewTime durationSeconds = lease.Spec.LeaseDurationSeconds diff --git a/pkg/sharding/ring/cache.go b/pkg/sharding/ring/cache.go deleted file mode 100644 index 013494ee..00000000 --- a/pkg/sharding/ring/cache.go +++ /dev/null @@ -1,133 +0,0 @@ -/* -Copyright 2023 Tim Ebert. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package ring - -import ( - "fmt" - "sort" - "sync" - - "github.com/cespare/xxhash/v2" - coordinationv1 "k8s.io/api/coordination/v1" - "k8s.io/utils/clock" - "sigs.k8s.io/controller-runtime/pkg/client" - - shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" - shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics" -) - -// Cache caches lease data and hash rings to reduce expensive ring calculations needed in every assignment operation -// in the sharder's webhooks or controllers. -// It rebuilds the hash ring from the given LeaseList if necessary. -type Cache interface { - // Get returns the cached ring and shard information for a given ClusterRing. - Get(ring client.Object, leaseList *coordinationv1.LeaseList) (*consistenthash.Ring, leases.Shards) -} - -// NewCache returns a new Cache. -func NewCache() Cache { - return &cache{ - Clock: clock.RealClock{}, - TokensPerNode: consistenthash.DefaultTokensPerNode, - cache: make(map[string]element), - } -} - -type cache struct { - Clock clock.Clock - TokensPerNode int - - lock sync.RWMutex - cache map[string]element -} - -type element struct { - checksum uint64 - ring *consistenthash.Ring - shards leases.Shards -} - -func (c *cache) Get(ringObj client.Object, leaseList *coordinationv1.LeaseList) (*consistenthash.Ring, leases.Shards) { - var kind, key string - - switch ringObj.(type) { - case *shardingv1alpha1.ClusterRing: - kind = shardingv1alpha1.KindClusterRing - key = ringObj.GetName() - default: - panic(fmt.Errorf("unexpected kind %T", ringObj)) - } - - checksum := leaseListChecksum(leaseList) - - // fast pre-check if cache is up-to-date - c.lock.RLock() - ring, ok := c.cache[key] - if ok && ring.checksum == checksum { - defer c.lock.RUnlock() - return ring.ring, ring.shards - } - - // upgrade lock to prepare writing - c.lock.RUnlock() - c.lock.Lock() - defer c.lock.Unlock() - - // re-check if cache was updated concurrently in the meantime - ring, ok = c.cache[key] - if ok && ring.checksum == checksum { - return ring.ring, ring.shards - } - - // set cached checksum - ring.checksum = checksum - - // determine ready shards and calculate hash ring - ring.shards = leases.ToShards(leaseList.Items, c.Clock) - availableShards := ring.shards.AvailableShards().IDs() - ring.ring = consistenthash.New(consistenthash.DefaultHash, c.TokensPerNode, availableShards...) - - // write back ring to cache - c.cache[key] = ring - - shardingmetrics.RingCalculationsTotal.WithLabelValues(kind, ringObj.GetNamespace(), ringObj.GetName()).Inc() - - return ring.ring, ring.shards -} - -// leaseListChecksum calculates a fast checksum for the given LeaseList. It avoids expensive checksum calculation of all -// fields by only including uid and resourceVersion of the leases. When the hash ring needs to be rebuild the checksum -// is guaranteed to change. The checksum changes much more frequently though. -// TODO: reduce the rate of ring calculation when this becomes a performance/resource panalty -func leaseListChecksum(leaseList *coordinationv1.LeaseList) uint64 { - ll := make([]string, len(leaseList.Items)) - for i, l := range leaseList.Items { - ll[i] = string(l.UID) + ";" + l.ResourceVersion + ";" - } - - // ensure stable checksum - sort.Strings(ll) - - hash := xxhash.New() - for _, l := range ll { - _, _ = hash.WriteString(l) - } - - return hash.Sum64() -} diff --git a/pkg/sharding/ring/ring.go b/pkg/sharding/ring/ring.go new file mode 100644 index 00000000..5700f89f --- /dev/null +++ b/pkg/sharding/ring/ring.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ring + +import ( + "fmt" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" + shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics" +) + +// FromLeases creates a ring from the given membership information (shard leases). It transforms shard leases into a +// usable form, i.e., a hash ring and leases.Shards. +// This is a central function in the sharding implementation bringing together the leases package with the +// consistenthash package. +// In short, it determines the subset of available shards and constructs a new consistenthash.Ring with it. +func FromLeases(ringObj client.Object, leaseList *coordinationv1.LeaseList, now time.Time) (*consistenthash.Ring, leases.Shards) { + var kind string + + switch ringObj.(type) { + case *shardingv1alpha1.ClusterRing: + kind = shardingv1alpha1.KindClusterRing + default: + panic(fmt.Errorf("unexpected kind %T", ringObj)) + } + + // determine ready shards and calculate hash ring + shards := leases.ToShards(leaseList.Items, now) + availableShards := shards.AvailableShards().IDs() + ring := consistenthash.New(nil, 0, availableShards...) + + shardingmetrics.RingCalculationsTotal.WithLabelValues(kind, ringObj.GetNamespace(), ringObj.GetName()).Inc() + + return ring, shards +} diff --git a/pkg/webhook/add.go b/pkg/webhook/add.go index 83a774e9..9e07c9a4 100644 --- a/pkg/webhook/add.go +++ b/pkg/webhook/add.go @@ -23,15 +23,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring" "github.com/timebertt/kubernetes-controller-sharding/pkg/webhook/sharder" ) // AddToManager adds all webhooks to the manager. -func AddToManager(ctx context.Context, mgr manager.Manager, ringCache ring.Cache, config *configv1alpha1.SharderConfig) error { - if err := (&sharder.Handler{ - Cache: ringCache, - }).AddToManager(mgr); err != nil { +func AddToManager(ctx context.Context, mgr manager.Manager, config *configv1alpha1.SharderConfig) error { + if err := (&sharder.Handler{}).AddToManager(mgr); err != nil { return fmt.Errorf("failed adding sharder webhook: %w", err) } diff --git a/pkg/webhook/sharder/add.go b/pkg/webhook/sharder/add.go index d1acab39..214b0e70 100644 --- a/pkg/webhook/sharder/add.go +++ b/pkg/webhook/sharder/add.go @@ -24,6 +24,7 @@ import ( "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -44,6 +45,9 @@ func (h *Handler) AddToManager(mgr manager.Manager) error { if h.Reader == nil { h.Reader = mgr.GetCache() } + if h.Clock == nil { + h.Clock = clock.RealClock{} + } mgr.GetWebhookServer().Register(WebhookPathPrefix, &admission.Webhook{ Handler: h, diff --git a/pkg/webhook/sharder/handler.go b/pkg/webhook/sharder/handler.go index 81a5dbd9..9339ed17 100644 --- a/pkg/webhook/sharder/handler.go +++ b/pkg/webhook/sharder/handler.go @@ -26,6 +26,7 @@ import ( coordinationv1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/utils/clock" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -40,13 +41,13 @@ import ( // Handler handles admission requests and invalidates the static token in Secret resources related to ServiceAccounts. type Handler struct { Reader client.Reader - Cache ring.Cache + Clock clock.PassiveClock } func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.Response { log := logf.FromContext(ctx) - ring, err := RingForRequest(ctx, h.Reader) + ringObj, err := RingForRequest(ctx, h.Reader) if err != nil { return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error determining ring for request: %w", err)) } @@ -58,7 +59,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error decoding object: %w", err)) } - labelShard := ring.LabelShard() + labelShard := ringObj.LabelShard() // Don't touch labels that the object already has, we can't simply reassign it because the active shard might still // be working on it. @@ -69,7 +70,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R keyFunc, err := sharding.KeyFuncForResource(metav1.GroupResource{ Group: req.Resource.Group, Resource: req.Resource.Resource, - }, ring) + }, ringObj) if err != nil { return admission.Errored(http.StatusBadRequest, fmt.Errorf("error deteriming hash key func for object: %w", err)) } @@ -84,15 +85,15 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R // collect list of shards in the ring leaseList := &coordinationv1.LeaseList{} - if err := h.Reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: ring.LeaseSelector()}); err != nil { + if err := h.Reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: ringObj.LeaseSelector()}); err != nil { return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error listing Leases for ClusterRing: %w", err)) } // get ring from cache and hash the object onto the ring - hashRing, _ := h.Cache.Get(ring, leaseList) + hashRing, _ := ring.FromLeases(ringObj, leaseList, h.Clock.Now()) shard := hashRing.Hash(key) - log.V(1).Info("Assigning object for ring", "ring", client.ObjectKeyFromObject(ring), "shard", shard) + log.V(1).Info("Assigning object for ring", "ring", client.ObjectKeyFromObject(ringObj), "shard", shard) patches := make([]jsonpatch.JsonPatchOperation, 0, 2) if obj.Labels == nil { @@ -103,7 +104,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R if !ptr.Deref(req.DryRun, false) { shardingmetrics.AssignmentsTotal.WithLabelValues( - shardingv1alpha1.KindClusterRing, ring.GetNamespace(), ring.GetName(), + shardingv1alpha1.KindClusterRing, ringObj.GetNamespace(), ringObj.GetName(), req.Resource.Group, req.Resource.Resource, ).Inc() }