diff --git a/manifests/reposync-crd.yaml b/manifests/reposync-crd.yaml index e2b7c4f73a..8cb0ee2db9 100644 --- a/manifests/reposync-crd.yaml +++ b/manifests/reposync-crd.yaml @@ -932,6 +932,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects detected in + the source of truth, after rendering, if applicable. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. @@ -1089,6 +1093,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects intended to + be synced to the cluster. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. @@ -2014,6 +2022,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects detected in + the source of truth, after rendering, if applicable. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. @@ -2171,6 +2183,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects intended to + be synced to the cluster. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. diff --git a/manifests/rootsync-crd.yaml b/manifests/rootsync-crd.yaml index 94ea9d2085..e6154ff10d 100644 --- a/manifests/rootsync-crd.yaml +++ b/manifests/rootsync-crd.yaml @@ -987,6 +987,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects detected in + the source of truth, after rendering, if applicable. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. @@ -1144,6 +1148,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects intended to + be synced to the cluster. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. @@ -2124,6 +2132,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects detected in + the source of truth, after rendering, if applicable. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. @@ -2281,6 +2293,10 @@ spec: format: date-time nullable: true type: string + objectCount: + description: objectCount is the number of objects intended to + be synced to the cluster. + type: integer ociStatus: description: ociStatus contains fields describing the status of an OCI source of truth. diff --git a/pkg/api/configsync/v1alpha1/sync_types.go b/pkg/api/configsync/v1alpha1/sync_types.go index 4c18878c26..50eff5581d 100644 --- a/pkg/api/configsync/v1alpha1/sync_types.go +++ b/pkg/api/configsync/v1alpha1/sync_types.go @@ -71,6 +71,11 @@ type SourceStatus struct { // +optional Commit string `json:"commit,omitempty"` + // objectCount is the number of objects detected in the source of truth, + // after rendering, if applicable. + // +optional + ObjectCount int `json:"objectCount,omitempty"` + // lastUpdate is the timestamp of when this status was last updated by a // reconciler. // +nullable @@ -142,6 +147,10 @@ type SyncStatus struct { // +optional Commit string `json:"commit,omitempty"` + // objectCount is the number of objects intended to be synced to the cluster. + // +optional + ObjectCount int `json:"objectCount,omitempty"` + // lastUpdate is the timestamp of when this status was last updated by a // reconciler. // +nullable diff --git a/pkg/api/configsync/v1alpha1/zz_generated.conversion.go b/pkg/api/configsync/v1alpha1/zz_generated.conversion.go index a683d230ff..6a8f6e37e1 100644 --- a/pkg/api/configsync/v1alpha1/zz_generated.conversion.go +++ b/pkg/api/configsync/v1alpha1/zz_generated.conversion.go @@ -1241,6 +1241,7 @@ func autoConvert_v1alpha1_SourceStatus_To_v1beta1_SourceStatus(in *SourceStatus, out.Oci = (*v1beta1.OciStatus)(unsafe.Pointer(in.Oci)) out.Helm = (*v1beta1.HelmStatus)(unsafe.Pointer(in.Helm)) out.Commit = in.Commit + out.ObjectCount = in.ObjectCount out.LastUpdate = in.LastUpdate out.Errors = *(*[]v1beta1.ConfigSyncError)(unsafe.Pointer(&in.Errors)) out.ErrorSummary = (*v1beta1.ErrorSummary)(unsafe.Pointer(in.ErrorSummary)) @@ -1257,6 +1258,7 @@ func autoConvert_v1beta1_SourceStatus_To_v1alpha1_SourceStatus(in *v1beta1.Sourc out.Oci = (*OciStatus)(unsafe.Pointer(in.Oci)) out.Helm = (*HelmStatus)(unsafe.Pointer(in.Helm)) out.Commit = in.Commit + out.ObjectCount = in.ObjectCount out.LastUpdate = in.LastUpdate out.Errors = *(*[]ConfigSyncError)(unsafe.Pointer(&in.Errors)) out.ErrorSummary = (*ErrorSummary)(unsafe.Pointer(in.ErrorSummary)) @@ -1315,6 +1317,7 @@ func autoConvert_v1alpha1_SyncStatus_To_v1beta1_SyncStatus(in *SyncStatus, out * out.Oci = (*v1beta1.OciStatus)(unsafe.Pointer(in.Oci)) out.Helm = (*v1beta1.HelmStatus)(unsafe.Pointer(in.Helm)) out.Commit = in.Commit + out.ObjectCount = in.ObjectCount out.LastUpdate = in.LastUpdate out.Errors = *(*[]v1beta1.ConfigSyncError)(unsafe.Pointer(&in.Errors)) out.ErrorSummary = (*v1beta1.ErrorSummary)(unsafe.Pointer(in.ErrorSummary)) @@ -1331,6 +1334,7 @@ func autoConvert_v1beta1_SyncStatus_To_v1alpha1_SyncStatus(in *v1beta1.SyncStatu out.Oci = (*OciStatus)(unsafe.Pointer(in.Oci)) out.Helm = (*HelmStatus)(unsafe.Pointer(in.Helm)) out.Commit = in.Commit + out.ObjectCount = in.ObjectCount out.LastUpdate = in.LastUpdate out.Errors = *(*[]ConfigSyncError)(unsafe.Pointer(&in.Errors)) out.ErrorSummary = (*ErrorSummary)(unsafe.Pointer(in.ErrorSummary)) diff --git a/pkg/api/configsync/v1beta1/sync_types.go b/pkg/api/configsync/v1beta1/sync_types.go index f5f5291916..5bef877002 100644 --- a/pkg/api/configsync/v1beta1/sync_types.go +++ b/pkg/api/configsync/v1beta1/sync_types.go @@ -71,6 +71,11 @@ type SourceStatus struct { // +optional Commit string `json:"commit,omitempty"` + // objectCount is the number of objects detected in the source of truth, + // after rendering, if applicable. + // +optional + ObjectCount int `json:"objectCount,omitempty"` + // lastUpdate is the timestamp of when this status was last updated by a // reconciler. // +nullable @@ -142,6 +147,10 @@ type SyncStatus struct { // +optional Commit string `json:"commit,omitempty"` + // objectCount is the number of objects intended to be synced to the cluster. + // +optional + ObjectCount int `json:"objectCount,omitempty"` + // lastUpdate is the timestamp of when this status was last updated by a // reconciler. // +nullable diff --git a/pkg/applier/applier.go b/pkg/applier/applier.go index 160bdbb111..efe38532c6 100644 --- a/pkg/applier/applier.go +++ b/pkg/applier/applier.go @@ -27,9 +27,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configmanagement" "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/api/kpt.dev/v1alpha1" "kpt.dev/configsync/pkg/applier/stats" "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/declared" @@ -74,9 +76,13 @@ type Applier interface { // source of truth (git, OCI, helm) and periodically. Apply(ctx context.Context, desiredResources []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) // Errors returns the errors encountered during apply. - // This method may be called while Destroy is running, to get the set of + // This method may be called while Apply is running, to get the set of // errors encountered so far. Errors() status.MultiError + // InventoryObjectCount returns the number of objects in the inventory. + // This method may be called while Apply is running, to get the count from + // the last time the inventory was updated. + InventoryObjectCount() int } // Destroyer is a bulk client for deleting all the managed resource objects @@ -91,6 +97,10 @@ type Destroyer interface { // This method may be called while Destroy is running, to get the set of // errors encountered so far. Errors() status.MultiError + // InventoryObjectCount returns the number of objects in the inventory. + // This method may be called while Apply is running, to get the count from + // the last time the inventory was updated. + InventoryObjectCount() int } // Supervisor is a bulk client for applying and deleting a mutable set of @@ -131,6 +141,11 @@ type supervisor struct { // errs received from the current (if running) or previous Apply/Destroy. // These errors is cleared at the start of the Apply/Destroy methods. errs status.MultiError + + // objectCountMux prevents concurrent modifications to the cached inventory + // object count. + objectCountMux sync.RWMutex + objectCount int } var _ Applier = &supervisor{} @@ -563,6 +578,16 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) (map[ } case event.ActionGroupType: klog.Info(e.ActionGroupEvent) + if e.ActionGroupEvent.Action == event.InventoryAction && e.ActionGroupEvent.Status == event.Finished { + // Update sync objectCount after inventory is updated + objectCount, err := a.lookupInventoryObjectCount(ctx) + if err != nil { + a.addError(err) + } else { + klog.Infof("Inventory ObjectCount: %d", objectCount) + a.setInventoryObjectCount(objectCount) + } + } case event.ErrorType: klog.Info(e.ErrorEvent) if util.IsRequestTooLargeError(e.ErrorEvent.Err) { @@ -652,6 +677,35 @@ func (a *supervisor) invalidateErrors() { a.errs = nil } +func (a *supervisor) InventoryObjectCount() int { + a.objectCountMux.RLock() + defer a.objectCountMux.RUnlock() + + return a.objectCount +} + +func (a *supervisor) setInventoryObjectCount(objectCount int) { + a.objectCountMux.Lock() + defer a.objectCountMux.Unlock() + + a.objectCount = objectCount +} + +func (a *supervisor) lookupInventoryObjectCount(ctx context.Context) (int, error) { + rg := &v1alpha1.ResourceGroup{} + rgRef := types.NamespacedName{ + Name: a.inventory.Name(), + Namespace: a.inventory.Namespace(), + } + if err := a.clientSet.Client.Get(ctx, rgRef, rg); err != nil { + if apierrors.IsNotFound(err) { + return 0, nil + } + return 0, fmt.Errorf("getting latest inventory: %w", err) + } + return len(rg.Spec.Resources), nil +} + // destroyInner triggers a kpt live destroy library call to destroy a set of resources. func (a *supervisor) destroyInner(ctx context.Context) status.MultiError { s := stats.NewSyncStats() diff --git a/pkg/core/scheme.go b/pkg/core/scheme.go index b0dd98388e..1d442c43aa 100644 --- a/pkg/core/scheme.go +++ b/pkg/core/scheme.go @@ -40,6 +40,7 @@ import ( configsyncv1alpha1 "kpt.dev/configsync/pkg/api/configsync/v1alpha1" configsyncv1beta1 "kpt.dev/configsync/pkg/api/configsync/v1beta1" hubv1 "kpt.dev/configsync/pkg/api/hub/v1" + kptv1alpha1 "kpt.dev/configsync/pkg/api/kpt.dev/v1alpha1" ) // Scheme is a reference to the global scheme. @@ -70,6 +71,7 @@ func init() { utilruntime.Must(configsyncv1alpha1.AddToScheme(scheme.Scheme)) utilruntime.Must(configsyncv1alpha1.RegisterConversions(scheme.Scheme)) utilruntime.Must(scheme.Scheme.SetVersionPriority(configsyncv1beta1.SchemeGroupVersion, configsyncv1alpha1.SchemeGroupVersion)) + utilruntime.Must(kptv1alpha1.AddToScheme(scheme.Scheme)) // Hub/Fleet types utilruntime.Must(hubv1.AddToScheme(scheme.Scheme)) diff --git a/pkg/parse/namespace.go b/pkg/parse/namespace.go index 337f62edd2..5050eb3f1b 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/namespace.go @@ -322,6 +322,10 @@ func (p *namespace) SyncErrors() status.MultiError { return p.Errors() } +func (p *namespace) InventoryObjectCount() int { + return p.Applier.InventoryObjectCount() +} + // Syncing returns true if the updater is running. // SyncErrors implements the Parser interface func (p *namespace) Syncing() bool { diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 12750aeefc..71d19db848 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -96,6 +96,7 @@ type Parser interface { // SyncErrors returns all the sync errors, including remediator errors, // validation errors, applier errors, and watch update errors. SyncErrors() status.MultiError + InventoryObjectCount() int // Syncing returns true if the updater is running. Syncing() bool // K8sClient returns the Kubernetes client that talks to the API server. diff --git a/pkg/parse/root.go b/pkg/parse/root.go index f0f9ec2fd5..1dbb92b9c4 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root.go @@ -217,6 +217,7 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus sourceS func setSourceStatusFields(source *v1beta1.SourceStatus, p Parser, newStatus sourceStatus, denominator int) { cse := status.ToCSE(newStatus.errs) source.Commit = newStatus.commit + source.ObjectCount = newStatus.objectCount switch p.options().SourceType { case v1beta1.GitSource: source.Git = &v1beta1.GitStatus{ @@ -438,6 +439,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, newStatus syncStatu func setSyncStatusFields(syncStatus *v1beta1.Status, newStatus syncStatus, denominator int) { cse := status.ToCSE(newStatus.errs) syncStatus.Sync.Commit = newStatus.commit + syncStatus.Sync.ObjectCount = newStatus.objectCount syncStatus.Sync.Git = syncStatus.Source.Git syncStatus.Sync.Oci = syncStatus.Source.Oci syncStatus.Sync.Helm = syncStatus.Source.Helm @@ -547,6 +549,10 @@ func (p *root) SyncErrors() status.MultiError { return p.Errors() } +func (p *root) InventoryObjectCount() int { + return p.Applier.InventoryObjectCount() +} + // Syncing returns true if the updater is running. // SyncErrors implements the Parser interface func (p *root) Syncing() bool { diff --git a/pkg/parse/root_test.go b/pkg/parse/root_test.go index d544f94e6a..82427a96d7 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_test.go @@ -1124,19 +1124,22 @@ func (p *fakeParser) ReadClusterNamesFromSelector(_ reader.FilePaths) ([]string, } type fakeApplier struct { - got []client.Object - errors []status.Error + got []client.Object + errors []status.Error + objectCount int } func (a *fakeApplier) Apply(_ context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { if a.errors == nil { a.got = objs + a.objectCount = len(objs) gvks := make(map[schema.GroupVersionKind]struct{}) for _, obj := range objs { gvks[obj.GetObjectKind().GroupVersionKind()] = struct{}{} } return gvks, nil } + a.objectCount = 0 var errs status.MultiError for _, e := range a.errors { errs = status.Append(errs, e) @@ -1152,6 +1155,10 @@ func (a *fakeApplier) Errors() status.MultiError { return errs } +func (a *fakeApplier) InventoryObjectCount() int { + return a.objectCount +} + func (a *fakeApplier) Syncing() bool { return false } diff --git a/pkg/parse/run.go b/pkg/parse/run.go index f58f8ff6a0..8deb6d3ca9 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -492,9 +492,10 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc sourceErrs := parseSource(ctx, p, trigger, state) klog.V(3).Info("Parser stopped") newSourceStatus := sourceStatus{ - commit: state.cache.source.commit, - errs: sourceErrs, - lastUpdate: metav1.Now(), + commit: state.cache.source.commit, + objectCount: len(state.cache.objsToApply), + errs: sourceErrs, + lastUpdate: metav1.Now(), } if state.needToSetSourceStatus(newSourceStatus) { klog.V(3).Infof("Updating source status (after parse): %#v", newSourceStatus) @@ -540,10 +541,11 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, syncing bool, syncErrs status.MultiError) error { // Update the RSync status, if necessary newSyncStatus := syncStatus{ - syncing: syncing, - commit: state.cache.source.commit, - errs: syncErrs, - lastUpdate: metav1.Now(), + syncing: syncing, + commit: state.cache.source.commit, + objectCount: p.InventoryObjectCount(), + errs: syncErrs, + lastUpdate: metav1.Now(), } if state.needToSetSyncStatus(newSyncStatus) { if err := p.SetSyncStatus(ctx, newSyncStatus); err != nil { diff --git a/pkg/parse/state.go b/pkg/parse/state.go index 194904b90c..a353d2a873 100644 --- a/pkg/parse/state.go +++ b/pkg/parse/state.go @@ -24,13 +24,14 @@ import ( ) type sourceStatus struct { - commit string - errs status.MultiError - lastUpdate metav1.Time + commit string + objectCount int + errs status.MultiError + lastUpdate metav1.Time } func (gs sourceStatus) equal(other sourceStatus) bool { - return gs.commit == other.commit && status.DeepEqual(gs.errs, other.errs) + return gs.commit == other.commit && gs.objectCount == other.objectCount && status.DeepEqual(gs.errs, other.errs) } type renderingStatus struct { @@ -48,14 +49,15 @@ func (rs renderingStatus) equal(other renderingStatus) bool { } type syncStatus struct { - syncing bool - commit string - errs status.MultiError - lastUpdate metav1.Time + syncing bool + commit string + objectCount int + errs status.MultiError + lastUpdate metav1.Time } func (gs syncStatus) equal(other syncStatus) bool { - return gs.syncing == other.syncing && gs.commit == other.commit && status.DeepEqual(gs.errs, other.errs) + return gs.syncing == other.syncing && gs.commit == other.commit && gs.objectCount == other.objectCount && status.DeepEqual(gs.errs, other.errs) } type reconcilerState struct { diff --git a/pkg/reconciler/finalizer/reposync_finalizer_test.go b/pkg/reconciler/finalizer/reposync_finalizer_test.go index 80271f2fcd..d0ca588a5d 100644 --- a/pkg/reconciler/finalizer/reposync_finalizer_test.go +++ b/pkg/reconciler/finalizer/reposync_finalizer_test.go @@ -291,7 +291,7 @@ func TestRepoSyncFinalize(t *testing.T) { // Return errors, if any return tc.destroyErrs } - fakeDestroyer := newFakeDestroyer(tc.destroyErrs, destroyFunc) + fakeDestroyer := newFakeDestroyer(tc.destroyErrs, 0, destroyFunc) finalizer := &RepoSyncFinalizer{ Destroyer: fakeDestroyer, Client: fakeClient, diff --git a/pkg/reconciler/finalizer/rootsync_finalizer_test.go b/pkg/reconciler/finalizer/rootsync_finalizer_test.go index 2c6af3404d..b706899fd2 100644 --- a/pkg/reconciler/finalizer/rootsync_finalizer_test.go +++ b/pkg/reconciler/finalizer/rootsync_finalizer_test.go @@ -297,7 +297,7 @@ func TestRootSyncFinalize(t *testing.T) { // Return errors, if any return tc.destroyErrs } - fakeDestroyer := newFakeDestroyer(tc.destroyErrs, destroyFunc) + fakeDestroyer := newFakeDestroyer(tc.destroyErrs, 0, destroyFunc) finalizer := &RootSyncFinalizer{ Destroyer: fakeDestroyer, Client: fakeClient, @@ -643,14 +643,16 @@ func updateToRemoveFinalizers(ctx context.Context, fakeClient *fake.Client, obj type fakeDestroyer struct { errs status.MultiError + objectCount int destroyFunc func(context.Context) status.MultiError } var _ applier.Destroyer = &fakeDestroyer{} -func newFakeDestroyer(errs status.MultiError, destroyFunc func(context.Context) status.MultiError) *fakeDestroyer { +func newFakeDestroyer(errs status.MultiError, objectCount int, destroyFunc func(context.Context) status.MultiError) *fakeDestroyer { return &fakeDestroyer{ errs: errs, + objectCount: objectCount, destroyFunc: destroyFunc, } } @@ -665,3 +667,7 @@ func (d *fakeDestroyer) Destroy(ctx context.Context) status.MultiError { func (d *fakeDestroyer) Errors() status.MultiError { return d.errs } + +func (d *fakeDestroyer) InventoryObjectCount() int { + return d.objectCount +} diff --git a/pkg/reconcilermanager/controllers/reconciler_container_resources.go b/pkg/reconcilermanager/controllers/reconciler_container_resources.go index 0c3ca7a849..76cf915367 100644 --- a/pkg/reconcilermanager/controllers/reconciler_container_resources.go +++ b/pkg/reconcilermanager/controllers/reconciler_container_resources.go @@ -77,10 +77,10 @@ func ReconcilerContainerResourceDefaultsForAutopilot() map[string]v1beta1.Contai return map[string]v1beta1.ContainerResourcesSpec{ reconcilermanager.Reconciler: { ContainerName: reconcilermanager.Reconciler, - CPURequest: resource.MustParse("700m"), - CPULimit: resource.MustParse("700m"), - MemoryRequest: resource.MustParse("512Mi"), - MemoryLimit: resource.MustParse("512Mi"), + CPURequest: resource.MustParse("250m"), + CPULimit: resource.MustParse("250m"), + MemoryRequest: resource.MustParse("256Mi"), + MemoryLimit: resource.MustParse("256Mi"), }, reconcilermanager.HydrationController: { ContainerName: reconcilermanager.HydrationController, diff --git a/pkg/reconcilermanager/controllers/reposync_controller.go b/pkg/reconcilermanager/controllers/reposync_controller.go index 6a14ec211e..5ba3f2e4cb 100644 --- a/pkg/reconcilermanager/controllers/reposync_controller.go +++ b/pkg/reconcilermanager/controllers/reposync_controller.go @@ -1190,6 +1190,21 @@ func (r *RepoSyncReconciler) mutationsFor(ctx context.Context, rs *v1beta1.RepoS var containerResourceDefaults map[string]v1beta1.ContainerResourcesSpec if autopilot { containerResourceDefaults = ReconcilerContainerResourceDefaultsForAutopilot() + // Use the larger of the old or new source size for scaling. + // This helps keep memory high for large additions. + objectCount := rs.Status.Sync.ObjectCount + if rs.Status.Source.ObjectCount > objectCount { + objectCount = rs.Status.Source.ObjectCount + } + // Double resources, if over 1000 objects + if objectCount >= 1000 { + defaults := containerResourceDefaults[reconcilermanager.Reconciler] + defaults.CPURequest.Add(defaults.CPURequest) + defaults.CPULimit.Add(defaults.CPULimit) + defaults.MemoryRequest.Add(defaults.MemoryRequest) + defaults.MemoryLimit.Add(defaults.MemoryLimit) + containerResourceDefaults[reconcilermanager.Reconciler] = defaults + } } else { containerResourceDefaults = ReconcilerContainerResourceDefaults() } diff --git a/pkg/reconcilermanager/controllers/rootsync_controller.go b/pkg/reconcilermanager/controllers/rootsync_controller.go index 7ffcbc693a..f210dd7009 100644 --- a/pkg/reconcilermanager/controllers/rootsync_controller.go +++ b/pkg/reconcilermanager/controllers/rootsync_controller.go @@ -1246,6 +1246,21 @@ func (r *RootSyncReconciler) mutationsFor(ctx context.Context, rs *v1beta1.RootS var containerResourceDefaults map[string]v1beta1.ContainerResourcesSpec if autopilot { containerResourceDefaults = ReconcilerContainerResourceDefaultsForAutopilot() + // Use the larger of the old or new source size for scaling. + // This helps keep memory high for large additions. + objectCount := rs.Status.Sync.ObjectCount + if rs.Status.Source.ObjectCount > objectCount { + objectCount = rs.Status.Source.ObjectCount + } + // Double resources, if over 1000 objects + if objectCount >= 1000 { + defaults := containerResourceDefaults[reconcilermanager.Reconciler] + defaults.CPURequest.Add(defaults.CPURequest) + defaults.CPULimit.Add(defaults.CPULimit) + defaults.MemoryRequest.Add(defaults.MemoryRequest) + defaults.MemoryLimit.Add(defaults.MemoryLimit) + containerResourceDefaults[reconcilermanager.Reconciler] = defaults + } } else { containerResourceDefaults = ReconcilerContainerResourceDefaults() }