Skip to content

Commit

Permalink
Optimize sharder performance (#38)
Browse files Browse the repository at this point in the history
* Increase retention time of prometheus

* clusterring: reduce unneeded reconciliations

* Replace sort package with generic slices package

* Pre-allocate slice/map correctly

* Use `xxhash.Sum64String`, skip converting to bytes

* Replace inefficient `fmt.Sprintf`

* Drop ring cache

* Refactor `leases` from clock to time

When transforming multiple shard leases, all leases should be transformed using the same timestamp.
This make things slightly more performant and also more consistent.
  • Loading branch information
timebertt authored Dec 1, 2023
1 parent 1ba0f78 commit 418e85d
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 200 deletions.
7 changes: 2 additions & 5 deletions cmd/sharder/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/timebertt/kubernetes-controller-sharding/pkg/controller"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring"
healthzutils "github.com/timebertt/kubernetes-controller-sharding/pkg/utils/healthz"
"github.com/timebertt/kubernetes-controller-sharding/pkg/webhook"
)
Expand Down Expand Up @@ -114,15 +113,13 @@ func run(ctx context.Context, log logr.Logger, opts *options) error {
return err
}

ringCache := ring.NewCache()

log.Info("Adding controllers to manager")
if err := controller.AddToManager(ctx, mgr, ringCache, opts.config); err != nil {
if err := controller.AddToManager(ctx, mgr, opts.config); err != nil {
return fmt.Errorf("failed adding controllers to manager: %w", err)
}

log.Info("Adding webhooks to manager")
if err := webhook.AddToManager(ctx, mgr, ringCache, opts.config); err != nil {
if err := webhook.AddToManager(ctx, mgr, opts.config); err != nil {
return fmt.Errorf("failed adding webhooks to manager: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions hack/config/monitoring/shoot/patch_prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ metadata:
name: k8s
namespace: monitoring
spec:
retention: 7d
retentionSize: 85GiB
retention: 30d
retentionSize: 90GiB
storage:
volumeClaimTemplate:
metadata:
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ import (
"github.com/timebertt/kubernetes-controller-sharding/pkg/controller/clusterring"
"github.com/timebertt/kubernetes-controller-sharding/pkg/controller/sharder"
"github.com/timebertt/kubernetes-controller-sharding/pkg/controller/shardlease"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring"
)

// AddToManager adds all controllers to the manager.
func AddToManager(ctx context.Context, mgr manager.Manager, ringCache ring.Cache, config *configv1alpha1.SharderConfig) error {
func AddToManager(ctx context.Context, mgr manager.Manager, config *configv1alpha1.SharderConfig) error {
if err := (&clusterring.Reconciler{
Config: config,
}).AddToManager(mgr); err != nil {
Expand All @@ -39,7 +38,6 @@ func AddToManager(ctx context.Context, mgr manager.Manager, ringCache ring.Cache

if err := (&sharder.Reconciler{
Config: config,
Cache: ringCache,
}).AddToManager(mgr); err != nil {
return fmt.Errorf("failed adding sharder controller: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/clusterring/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate {
}

// only enqueue ring if the shard's state changed
return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock)
now := r.Clock.Now()
return leases.ToState(oldLease, now).IsAvailable() != leases.ToState(newLease, now).IsAvailable()
},
DeleteFunc: func(_ event.DeleteEvent) bool { return true },
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/clusterring/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, r.updateStatusError(ctx, log, fmt.Errorf("error listing Leases for ClusterRing: %w", err), clusterRing, before)
}

shards := leases.ToShards(leaseList.Items, r.Clock)
shards := leases.ToShards(leaseList.Items, r.Clock.Now())
clusterRing.Status.Shards = int32(len(shards))
clusterRing.Status.AvailableShards = int32(len(shards.AvailableShards()))

Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/sharder/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate {

// We only need to resync the ring if the new shard is available right away.
// Note: on controller start we will enqueue anyway for the add event of ClusterRings.
return leases.ToState(lease, r.Clock).IsAvailable()
return leases.ToState(lease, r.Clock.Now()).IsAvailable()
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldLease, ok := e.ObjectOld.(*coordinationv1.Lease)
Expand All @@ -105,7 +105,8 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate {
}

// We only need to resync the ring if the shard's availability changed.
return leases.ToState(oldLease, r.Clock).IsAvailable() != leases.ToState(newLease, r.Clock).IsAvailable()
now := r.Clock.Now()
return leases.ToState(oldLease, now).IsAvailable() != leases.ToState(newLease, now).IsAvailable()
},
DeleteFunc: func(e event.DeleteEvent) bool {
if e.DeleteStateUnknown {
Expand All @@ -123,7 +124,7 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate {
}

// We only need to resync the ring if the removed shard was still available.
return leases.ToState(lease, r.Clock).IsAvailable()
return leases.ToState(lease, r.Clock.Now()).IsAvailable()
},
},
)
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/sharder/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ type Reconciler struct {
Reader client.Reader
Clock clock.PassiveClock
Config *configv1alpha1.SharderConfig

Cache ring.Cache
}

// Reconcile reconciles a ClusterRing object.
Expand All @@ -86,7 +84,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
}

// get ring and shards from cache
hashRing, shards := r.Cache.Get(clusterRing, leaseList)
hashRing, shards := ring.FromLeases(clusterRing, leaseList, r.Clock.Now())

namespaces, err := r.getSelectedNamespaces(ctx, clusterRing)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/shardlease/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (r *Reconciler) LeasePredicate() predicate.Predicate {
return false
}

return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock)
now := r.Clock.Now()
return leases.ToState(oldLease, now) != leases.ToState(newLease, now)
},
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
},
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/shardlease/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
}

var (
now = r.Clock.Now()
previousState = leases.StateFromString(lease.Labels[shardingv1alpha1.LabelState])
shard = leases.ToShard(lease, r.Clock)
shard = leases.ToShard(lease, now)
)
log = log.WithValues("state", shard.State, "expirationTime", shard.Times.Expiration, "leaseDuration", shard.Times.LeaseDuration)

Expand All @@ -99,16 +100,16 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
case leases.Uncertain:
log.Info("Shard lease has expired more than leaseDuration ago, trying to acquire shard lease")

now := metav1.NewMicroTime(r.Clock.Now())
nowMicro := metav1.NewMicroTime(now)
transitions := int32(0)
if lease.Spec.LeaseTransitions != nil {
transitions = *lease.Spec.LeaseTransitions
}

lease.Spec.HolderIdentity = ptr.To(shardingv1alpha1.IdentityShardLeaseController)
lease.Spec.LeaseDurationSeconds = ptr.To(2 * int32(shard.Times.LeaseDuration.Round(time.Second).Seconds()))
lease.Spec.AcquireTime = &now
lease.Spec.RenewTime = &now
lease.Spec.AcquireTime = &nowMicro
lease.Spec.RenewTime = &nowMicro
lease.Spec.LeaseTransitions = ptr.To(transitions + 1)
if err := r.Client.Update(ctx, lease); err != nil {
return reconcile.Result{}, fmt.Errorf("error acquiring shard lease: %w", err)
Expand Down
31 changes: 13 additions & 18 deletions pkg/sharding/consistenthash/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ limitations under the License.
package consistenthash

import (
"fmt"
"sort"
"slices"
"strconv"

"github.com/cespare/xxhash/v2"
)

// Hash is a function computing a 64-bit digest.
type Hash func(data []byte) uint64
type Hash func(data string) uint64

// DefaultHash is the default Hash used by Ring.
var DefaultHash Hash = xxhash.Sum64
var DefaultHash Hash = xxhash.Sum64String

// DefaultTokensPerNode is the default number of virtual nodes per node.
const DefaultTokensPerNode = 100
Expand All @@ -41,12 +41,13 @@ func New(fn Hash, tokensPerNode int, initialNodes ...string) *Ring {
tokensPerNode = DefaultTokensPerNode
}

numTokens := len(initialNodes) * tokensPerNode
r := &Ring{
hash: fn,
tokensPerNode: tokensPerNode,

tokens: make([]uint64, 0, len(initialNodes)*tokensPerNode),
tokenToNode: make(map[uint64]string, len(initialNodes)),
tokens: make([]uint64, 0, numTokens),
tokenToNode: make(map[uint64]string, numTokens),
}
r.AddNodes(initialNodes...)
return r
Expand All @@ -69,32 +70,26 @@ func (r *Ring) IsEmpty() bool {
func (r *Ring) AddNodes(nodes ...string) {
for _, node := range nodes {
for i := 0; i < r.tokensPerNode; i++ {
t := r.hash([]byte(fmt.Sprintf("%s-%d", node, i)))
t := r.hash(node + strconv.FormatInt(int64(i), 10))
r.tokens = append(r.tokens, t)
r.tokenToNode[t] = node
}
}

// sort all tokens on the ring for binary searches
sort.Slice(r.tokens, func(i, j int) bool {
return r.tokens[i] < r.tokens[j]
})
slices.Sort(r.tokens)
}

func (r *Ring) Hash(key string) string {
if r.IsEmpty() {
return ""
}

// Hash key and walk the ring until we find the next virtual node
h := r.hash([]byte(key))
// Hash key and find the next virtual node on the ring
h := r.hash(key)
i, _ := slices.BinarySearch(r.tokens, h)

// binary search
i := sort.Search(len(r.tokens), func(i int) bool {
return r.tokens[i] >= h
})

// walked the whole ring
// walked the whole ring, next virtual node is the first one
if i == len(r.tokens) {
i = 0
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/sharding/leases/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package leases

import (
"k8s.io/utils/clock"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
)
Expand Down Expand Up @@ -50,7 +50,7 @@ func (s Shards) ByID(id string) Shard {

// AvailableShards returns the subset of available Shards as determined by IsAvailable.
func (s Shards) AvailableShards() Shards {
var shards Shards
shards := make(Shards, 0, len(s))
for _, shard := range s {
if shard.State.IsAvailable() {
shards = append(shards, shard)
Expand All @@ -71,18 +71,18 @@ func (s Shards) IDs() []string {
}

// ToShards takes a list of Lease objects and transforms them to a list of Shards.
func ToShards(leases []coordinationv1.Lease, cl clock.PassiveClock) Shards {
func ToShards(leases []coordinationv1.Lease, now time.Time) Shards {
shards := make(Shards, 0, len(leases))
for _, lease := range leases {
l := lease
shards = append(shards, ToShard(&l, cl))
shards = append(shards, ToShard(&l, now))
}
return shards
}

// ToShard takes a Lease object and transforms it to a Shard.
func ToShard(lease *coordinationv1.Lease, cl clock.PassiveClock) Shard {
times := ToTimes(lease, cl)
func ToShard(lease *coordinationv1.Lease, now time.Time) Shard {
times := ToTimes(lease, now)
return Shard{
ID: lease.GetName(),
Times: times,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sharding/leases/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package leases

import (
"k8s.io/utils/clock"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
)
Expand Down Expand Up @@ -83,8 +83,8 @@ func (s ShardState) IsAvailable() bool {
}

// ToState returns the ShardState of the given Lease.
func ToState(lease *coordinationv1.Lease, cl clock.PassiveClock) ShardState {
return toState(lease, ToTimes(lease, cl))
func ToState(lease *coordinationv1.Lease, now time.Time) ShardState {
return toState(lease, ToTimes(lease, now))
}

func toState(lease *coordinationv1.Lease, t Times) ShardState {
Expand Down
5 changes: 1 addition & 4 deletions pkg/sharding/leases/times.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package leases
import (
"time"

"k8s.io/utils/clock"

coordinationv1 "k8s.io/api/coordination/v1"
)

Expand All @@ -46,10 +44,9 @@ type Times struct {
}

// ToTimes parses the times and durations in the given Lease object and returns them in the Times representation.
func ToTimes(lease *coordinationv1.Lease, cl clock.PassiveClock) Times {
func ToTimes(lease *coordinationv1.Lease, now time.Time) Times {
var (
t = Times{}
now = cl.Now()
acquireTime = lease.Spec.AcquireTime
renewTime = lease.Spec.RenewTime
durationSeconds = lease.Spec.LeaseDurationSeconds
Expand Down
Loading

0 comments on commit 418e85d

Please sign in to comment.