Skip to content

Commit

Permalink
[WIP] Prototype autoscaling by object count
Browse files Browse the repository at this point in the history
- Add .status.source.objectCount to track number of objects found
  after parsing from source (after hydration, if any).
- Add .status.sync.objectCount to track number of objects in the
  inventory ResourceGroup's .spec.resources.
- Add resource doubling for the reconciler container when the
  object count (greater of sync & source) is at least 1,000
  (autopilot only).
- Reduce the default resources for the reconciler on autopilot

Notes:
- Since the applier doesn't send the inventory size over events,
  we have to query the ResourceGroup after each inventory update event.
  This could be made more efficient with changes to the applier in
  cli-utils, but isn't really a big deal.
- Autoscaling is currently very trivial and could be fine tuned,
  but this is just a proof of concept to show an MVP and test that
  it would work on our tests.
  • Loading branch information
karlkfi committed Mar 29, 2024
1 parent f0fc85f commit a8fcfa4
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 26 deletions.
16 changes: 16 additions & 0 deletions manifests/reposync-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects detected in
the source of truth, after rendering, if applicable.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down Expand Up @@ -1089,6 +1093,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects intended to
be synced to the cluster.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down Expand Up @@ -2014,6 +2022,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects detected in
the source of truth, after rendering, if applicable.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down Expand Up @@ -2171,6 +2183,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects intended to
be synced to the cluster.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down
16 changes: 16 additions & 0 deletions manifests/rootsync-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects detected in
the source of truth, after rendering, if applicable.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down Expand Up @@ -1144,6 +1148,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects intended to
be synced to the cluster.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down Expand Up @@ -2124,6 +2132,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects detected in
the source of truth, after rendering, if applicable.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down Expand Up @@ -2281,6 +2293,10 @@ spec:
format: date-time
nullable: true
type: string
objectCount:
description: objectCount is the number of objects intended to
be synced to the cluster.
type: integer
ociStatus:
description: ociStatus contains fields describing the status of
an OCI source of truth.
Expand Down
9 changes: 9 additions & 0 deletions pkg/api/configsync/v1alpha1/sync_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type SourceStatus struct {
// +optional
Commit string `json:"commit,omitempty"`

// objectCount is the number of objects detected in the source of truth,
// after rendering, if applicable.
// +optional
ObjectCount int `json:"objectCount,omitempty"`

// lastUpdate is the timestamp of when this status was last updated by a
// reconciler.
// +nullable
Expand Down Expand Up @@ -142,6 +147,10 @@ type SyncStatus struct {
// +optional
Commit string `json:"commit,omitempty"`

// objectCount is the number of objects intended to be synced to the cluster.
// +optional
ObjectCount int `json:"objectCount,omitempty"`

// lastUpdate is the timestamp of when this status was last updated by a
// reconciler.
// +nullable
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/configsync/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/api/configsync/v1beta1/sync_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type SourceStatus struct {
// +optional
Commit string `json:"commit,omitempty"`

// objectCount is the number of objects detected in the source of truth,
// after rendering, if applicable.
// +optional
ObjectCount int `json:"objectCount,omitempty"`

// lastUpdate is the timestamp of when this status was last updated by a
// reconciler.
// +nullable
Expand Down Expand Up @@ -142,6 +147,10 @@ type SyncStatus struct {
// +optional
Commit string `json:"commit,omitempty"`

// objectCount is the number of objects intended to be synced to the cluster.
// +optional
ObjectCount int `json:"objectCount,omitempty"`

// lastUpdate is the timestamp of when this status was last updated by a
// reconciler.
// +nullable
Expand Down
56 changes: 55 additions & 1 deletion pkg/applier/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"kpt.dev/configsync/pkg/api/configmanagement"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/kpt.dev/v1alpha1"
"kpt.dev/configsync/pkg/applier/stats"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/declared"
Expand Down Expand Up @@ -74,9 +76,13 @@ type Applier interface {
// source of truth (git, OCI, helm) and periodically.
Apply(ctx context.Context, desiredResources []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError)
// Errors returns the errors encountered during apply.
// This method may be called while Destroy is running, to get the set of
// This method may be called while Apply is running, to get the set of
// errors encountered so far.
Errors() status.MultiError
// InventoryObjectCount returns the number of objects in the inventory.
// This method may be called while Apply is running, to get the count from
// the last time the inventory was updated.
InventoryObjectCount() int
}

// Destroyer is a bulk client for deleting all the managed resource objects
Expand All @@ -91,6 +97,10 @@ type Destroyer interface {
// This method may be called while Destroy is running, to get the set of
// errors encountered so far.
Errors() status.MultiError
// InventoryObjectCount returns the number of objects in the inventory.
// This method may be called while Apply is running, to get the count from
// the last time the inventory was updated.
InventoryObjectCount() int
}

// Supervisor is a bulk client for applying and deleting a mutable set of
Expand Down Expand Up @@ -131,6 +141,11 @@ type supervisor struct {
// errs received from the current (if running) or previous Apply/Destroy.
// These errors is cleared at the start of the Apply/Destroy methods.
errs status.MultiError

// objectCountMux prevents concurrent modifications to the cached inventory
// object count.
objectCountMux sync.RWMutex
objectCount int
}

var _ Applier = &supervisor{}
Expand Down Expand Up @@ -563,6 +578,16 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) (map[
}
case event.ActionGroupType:
klog.Info(e.ActionGroupEvent)
if e.ActionGroupEvent.Action == event.InventoryAction && e.ActionGroupEvent.Status == event.Finished {
// Update sync objectCount after inventory is updated
objectCount, err := a.lookupInventoryObjectCount(ctx)
if err != nil {
a.addError(err)
} else {
klog.Infof("Inventory ObjectCount: %d", objectCount)
a.setInventoryObjectCount(objectCount)
}
}
case event.ErrorType:
klog.Info(e.ErrorEvent)
if util.IsRequestTooLargeError(e.ErrorEvent.Err) {
Expand Down Expand Up @@ -652,6 +677,35 @@ func (a *supervisor) invalidateErrors() {
a.errs = nil
}

func (a *supervisor) InventoryObjectCount() int {
a.objectCountMux.RLock()
defer a.objectCountMux.RUnlock()

return a.objectCount
}

func (a *supervisor) setInventoryObjectCount(objectCount int) {
a.errorMux.Lock()
defer a.errorMux.Unlock()

a.objectCount = objectCount
}

func (a *supervisor) lookupInventoryObjectCount(ctx context.Context) (int, error) {
rg := &v1alpha1.ResourceGroup{}
rgRef := types.NamespacedName{
Name: a.inventory.Name(),
Namespace: a.inventory.Namespace(),
}
if err := a.clientSet.Client.Get(ctx, rgRef, rg); err != nil {
if apierrors.IsNotFound(err) {
return 0, nil
}
return 0, fmt.Errorf("getting latest inventory: %w", err)
}
return len(rg.Spec.Resources), nil
}

// destroyInner triggers a kpt live destroy library call to destroy a set of resources.
func (a *supervisor) destroyInner(ctx context.Context) status.MultiError {
s := stats.NewSyncStats()
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
configsyncv1alpha1 "kpt.dev/configsync/pkg/api/configsync/v1alpha1"
configsyncv1beta1 "kpt.dev/configsync/pkg/api/configsync/v1beta1"
hubv1 "kpt.dev/configsync/pkg/api/hub/v1"
kptv1alpha1 "kpt.dev/configsync/pkg/api/kpt.dev/v1alpha1"
)

// Scheme is a reference to the global scheme.
Expand Down Expand Up @@ -70,6 +71,7 @@ func init() {
utilruntime.Must(configsyncv1alpha1.AddToScheme(scheme.Scheme))
utilruntime.Must(configsyncv1alpha1.RegisterConversions(scheme.Scheme))
utilruntime.Must(scheme.Scheme.SetVersionPriority(configsyncv1beta1.SchemeGroupVersion, configsyncv1alpha1.SchemeGroupVersion))
utilruntime.Must(kptv1alpha1.AddToScheme(scheme.Scheme))

// Hub/Fleet types
utilruntime.Must(hubv1.AddToScheme(scheme.Scheme))
Expand Down
4 changes: 4 additions & 0 deletions pkg/parse/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ func (p *namespace) SyncErrors() status.MultiError {
return p.Errors()
}

func (p *namespace) InventoryObjectCount() int {
return p.Applier.InventoryObjectCount()
}

// Syncing returns true if the updater is running.
// SyncErrors implements the Parser interface
func (p *namespace) Syncing() bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/parse/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Parser interface {
// SyncErrors returns all the sync errors, including remediator errors,
// validation errors, applier errors, and watch update errors.
SyncErrors() status.MultiError
InventoryObjectCount() int
// Syncing returns true if the updater is running.
Syncing() bool
// K8sClient returns the Kubernetes client that talks to the API server.
Expand Down
6 changes: 6 additions & 0 deletions pkg/parse/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus sourceS
func setSourceStatusFields(source *v1beta1.SourceStatus, p Parser, newStatus sourceStatus, denominator int) {
cse := status.ToCSE(newStatus.errs)
source.Commit = newStatus.commit
source.ObjectCount = newStatus.objectCount
switch p.options().SourceType {
case v1beta1.GitSource:
source.Git = &v1beta1.GitStatus{
Expand Down Expand Up @@ -438,6 +439,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, newStatus syncStatu
func setSyncStatusFields(syncStatus *v1beta1.Status, newStatus syncStatus, denominator int) {
cse := status.ToCSE(newStatus.errs)
syncStatus.Sync.Commit = newStatus.commit
syncStatus.Sync.ObjectCount = newStatus.objectCount
syncStatus.Sync.Git = syncStatus.Source.Git
syncStatus.Sync.Oci = syncStatus.Source.Oci
syncStatus.Sync.Helm = syncStatus.Source.Helm
Expand Down Expand Up @@ -547,6 +549,10 @@ func (p *root) SyncErrors() status.MultiError {
return p.Errors()
}

func (p *root) InventoryObjectCount() int {
return p.Applier.InventoryObjectCount()
}

// Syncing returns true if the updater is running.
// SyncErrors implements the Parser interface
func (p *root) Syncing() bool {
Expand Down
11 changes: 9 additions & 2 deletions pkg/parse/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,19 +1124,22 @@ func (p *fakeParser) ReadClusterNamesFromSelector(_ reader.FilePaths) ([]string,
}

type fakeApplier struct {
got []client.Object
errors []status.Error
got []client.Object
errors []status.Error
objectCount int
}

func (a *fakeApplier) Apply(_ context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
if a.errors == nil {
a.got = objs
a.objectCount = len(objs)
gvks := make(map[schema.GroupVersionKind]struct{})
for _, obj := range objs {
gvks[obj.GetObjectKind().GroupVersionKind()] = struct{}{}
}
return gvks, nil
}
a.objectCount = 0
var errs status.MultiError
for _, e := range a.errors {
errs = status.Append(errs, e)
Expand All @@ -1152,6 +1155,10 @@ func (a *fakeApplier) Errors() status.MultiError {
return errs
}

func (a *fakeApplier) InventoryObjectCount() int {
return a.objectCount
}

func (a *fakeApplier) Syncing() bool {
return false
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,10 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc
sourceErrs := parseSource(ctx, p, trigger, state)
klog.V(3).Info("Parser stopped")
newSourceStatus := sourceStatus{
commit: state.cache.source.commit,
errs: sourceErrs,
lastUpdate: metav1.Now(),
commit: state.cache.source.commit,
objectCount: len(state.cache.objsToApply),
errs: sourceErrs,
lastUpdate: metav1.Now(),
}
if state.needToSetSourceStatus(newSourceStatus) {
klog.V(3).Infof("Updating source status (after parse): %#v", newSourceStatus)
Expand Down Expand Up @@ -540,10 +541,11 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc
func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, syncing bool, syncErrs status.MultiError) error {
// Update the RSync status, if necessary
newSyncStatus := syncStatus{
syncing: syncing,
commit: state.cache.source.commit,
errs: syncErrs,
lastUpdate: metav1.Now(),
syncing: syncing,
commit: state.cache.source.commit,
objectCount: p.InventoryObjectCount(),
errs: syncErrs,
lastUpdate: metav1.Now(),
}
if state.needToSetSyncStatus(newSyncStatus) {
if err := p.SetSyncStatus(ctx, newSyncStatus); err != nil {
Expand Down
Loading

0 comments on commit a8fcfa4

Please sign in to comment.