Skip to content

Commit

Permalink
[WIP] Prototype autoscaling by object count
Browse files Browse the repository at this point in the history
- Add .status.source.objectCount to track number of objects found
  after parsing from source (after hydration, if any).
- Add .status.sync.objectCount to track number of objects in the
  inventory ResourceGroup's .spec.resources.
- Add resource doubling for the reconciler container when the
  object count (greater of sync & source) is at least 1,000
  (autopilot only).
- Reduce the default resources for the reconciler on autopilot

Notes:
- Since the applier doesn't send the inventory size over events,
  we have to query the ResourceGroup after each inventory update event.
  This could be made more efficient with changes to the applier in
  cli-utils, but isn't really a big deal.
- Autoscaling is currently very trivial and could be fine tuned,
  but this is just a proof of concept to show an MVP and test that
  it would work on our tests.

Filter remediator watch by labels

Add watch label selectors

- Add config.k8s.io/owning-inventory as a label to all managed objects
- Add label selectors to the watchers used by the remediator and applier
- Modify cli-utils to allow optional label selector injection

Add more complex autoscaling math

Revert metrics expectation changes

Fix e2e test with hard coded label expectations

WIP resolve test issues

- Update TestConflictingDefinitions_NamespaceToRoot to expect
  a management conflict from the remediator, and for the
  remediator's conflict to replace the applier's conflict.
- De-dupe management conflict errors from remediator & applier.
- Fix a bug in the remediator that was dropping management
  conflict errors when using a watch filter. With the filter,
  changing the filter labels causes a DELETE event, instead of
  an UPDATE event, like it did before. The only way to
  double-check is to query the cluster, and we don't want to do
  that in the watch handler for performance reasons. So instead,
  update the remediator queue worker to query the cluster to
  confirm object deletion. Then also update the worker to be able
  to add management conflict errors to the same conflictHandler
  used by the watcher.
- Add HasConflictErrors() to the conflictHandler to replace the
  locked managementConflict bool in the watch.Manager. This will
  cause the retry run loop to re-apply if there's still a conflict.
  Is this desired behavior? Or do we only want to re-apply the
  first time? If so, we may need to cache the errors in the run
  loop so we can detect when new conflicts are added. Or maybe
  just expose the whole conflictHandler.
- Clean up parse.Run a little. Add resetRetryBackoff method to
  avoid leaking implementation details to the rest of the parser.
- Remove obsolete retry setting, since it wasn't exposed and was
  previously replaced with a backoff strategy.
- Fix the reconciler container CPU calculation to use milli-cpu.
- Add ConflictingObjectID to ManagementConflictError to allow
  de-duping by ID.
- Fix KptManagementConflictError to return a ManagementConflictError,
  not just a ResourceError. The currentManager will be UNKNOWN,
  but it only seems to be used by reportRootSyncConflicts, which
  only reports conflicts from the remediator, not the applier.

Reduce memory/cpu step size to 128Mi:125m

Reduce memory step size to 32Mi

This allows scaling down to 256Mi total reconciler Pod memory, not
just 512Mi. However, on non-bursting autopilot clusters, the
minimum is still 512Mi.

Avoid invalid periodic sync status updates

Skip sync status updates when the source cache is reset, otherwise
the commit that was previously synced will be removed.

Refactor Applier to expose an event handler

- Use the new event handler to send errors and inventory updates
  to the caller (Updater) so it can track them without locks in
  the applier.
- Update the Updater to cache the sync status with a shared lock.
- Add initialization to the Updater to refresh the sync status
  using the RootSync/RepoSync status. This should avoid losing
  status fields after the reconciler is restarted or rescheduled.
  Unfortunately, this doesn't work for errors, because they can't
  be parsed back into typed errors. But this shouldn't be a real
  problem because the applier was invalidating them before each
  apply anyway.
- Move periodic sync status updates into the Updater.
- Add a baseFinalizer class to handle collecting destroyer errors.

Double memory per object

Even though the memory limit seems higher than usage, the reconciler
is still being killed by the kernel because the node is out of
memory on autopilot. So we need to be a little more defensive about
requesting more memory than we need in order to avoid OOMKills on
crowded nodes.

Add TestStressProfileResourcesByObjectCount

Fix e2e tests that revert manual changes

In order for drift correction to work now, the object needs the
correct owning-inventory label used by the watch filter.
Without the label, drift will not be reverted.

Add TestStressProfileResourcesByObjectCountWithMultiSync

Fix waiting for deployments

Use configsync.gke.io/parent-package-id as filter

- Filter label key: configsync.gke.io/parent-package-id
- PACKAGE_ID: <PACKAGE_ID_FULL[0:53]>-<hex(fnv32a(PACKAGE_ID_FULL))>
- PACKAGE_ID_FULL: <NAME>.<NAMESPACE>.<RootSync|RepoSync>
- Pre-pad the hex of the hash to 8 characters for consistent length

Add configsync.gke.io/package-id label to RSyncs

- Use reconciler-manager field manager when updating objects, to
  distinguish from the reconciler. Otherwise SSA from a parent
  reconciler managing an RSync will delete any metadata updates made
  by the reconciler-manager.

Fix rebase errors

Fix labels in TestKubectl tests

Use latest cli-utils watch filters
  • Loading branch information
karlkfi committed May 25, 2024
1 parent 529d24f commit b404034
Show file tree
Hide file tree
Showing 81 changed files with 2,823 additions and 676 deletions.
4 changes: 2 additions & 2 deletions cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2/textlogger"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/applier"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/importer/filesystem"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
Expand Down Expand Up @@ -180,7 +181,6 @@ func main() {
ReconcilerScope: declared.Scope(*scope),
ResyncPeriod: *resyncPeriod,
PollingPeriod: *pollingPeriod,
RetryPeriod: configsync.DefaultReconcilerRetryPeriod,
StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
Expand All @@ -193,7 +193,7 @@ func main() {
SyncDir: relSyncDir,
SyncName: *syncName,
ReconcilerName: *reconcilerName,
StatusMode: *statusMode,
StatusMode: applier.InventoryStatusMode(*statusMode),
ReconcileTimeout: *reconcileTimeout,
APIServerTimeout: *apiServerTimeout,
RenderingEnabled: *renderingEnabled,
Expand Down
79 changes: 76 additions & 3 deletions e2e/nomostest/prometheus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,12 @@ func reconcilerOperationMetrics(nt *NT, syncLabels prometheusmodel.LabelSet, op
return func(ctx context.Context, v1api prometheusv1.API) error {
var err error
err = multierr.Append(err, metricAPICallDurationViewOperationHasStatus(ctx, nt, v1api, syncLabels, string(op.Operation), ocmetrics.StatusSuccess))
err = multierr.Append(err, metricApplyOperationsViewHasValueAtLeast(ctx, nt, v1api, syncLabels, string(op.Operation), ocmetrics.StatusSuccess, op.Count))
// err = multierr.Append(err, metricAPICallDurationViewOperationMustNotHaveStatus(ctx, nt, v1api, syncLabels, string(op.Operation), ocmetrics.StatusError))
// err = multierr.Append(err, metricApplyOperationsViewMustNotHaveStatus(ctx, nt, v1api, syncLabels, ocmetrics.RemediatorController, string(op.Operation), ocmetrics.StatusError))
// err = multierr.Append(err, metricApplyOperationsViewMustNotHaveStatus(ctx, nt, v1api, syncLabels, ocmetrics.ApplierController, string(op.Operation), ocmetrics.StatusError))
err = multierr.Append(err, metricApplyOperationsViewHasValueAtLeast(ctx, nt, v1api, syncLabels, ocmetrics.ApplierController, string(op.Operation), ocmetrics.StatusSuccess, op.Count))
err = multierr.Append(err, metricRemediateDurationViewHasStatus(ctx, nt, v1api, syncLabels, ocmetrics.StatusSuccess))
// err = multierr.Append(err, metricRemediateDurationViewMustNotHaveStatus(ctx, nt, v1api, syncLabels, ocmetrics.StatusError))
return err
}
}
Expand Down Expand Up @@ -462,12 +466,25 @@ func metricAPICallDurationViewOperationHasStatus(ctx context.Context, nt *NT, v1
return metricExists(ctx, nt, v1api, query)
}

func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, operation, status string, value int) error {
func metricAPICallDurationViewOperationMustNotHaveStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, operation, status string) error {
metricName := ocmetrics.APICallDurationView.Name
// APICallDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
query := fmt.Sprintf("%s%s", metricName, labels)
return metricMustNotExist(ctx, nt, v1api, query)
}

func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, controller, operation, status string, value int) error {
metricName := ocmetrics.ApplyOperationsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(ocmetrics.ApplierController),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(controller),
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -476,6 +493,20 @@ func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api
return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value))
}

func metricApplyOperationsViewMustNotHaveStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, controller, operation, status string) error {
metricName := ocmetrics.ApplyOperationsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(controller),
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
// ApplyOperationsView is a count, so we don't need to aggregate
query := fmt.Sprintf("%s%s", metricName, labels)
return metricMustNotExist(ctx, nt, v1api, query)
}

func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, status string) error {
metricName := ocmetrics.RemediateDurationView.Name
// RemediateDurationView is a distribution. Query count to aggregate.
Expand All @@ -488,6 +519,18 @@ func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api pro
return metricExists(ctx, nt, v1api, query)
}

func metricRemediateDurationViewMustNotHaveStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, status string) error {
metricName := ocmetrics.RemediateDurationView.Name
// RemediateDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
query := fmt.Sprintf("%s%s", metricName, labels)
return metricMustNotExist(ctx, nt, v1api, query)
}

// metricQueryNow performs the specified query with the default timeout.
// Query is debug logged. Warnings are logged. Errors are returned.
func metricQueryNow(ctx context.Context, nt *NT, v1api prometheusv1.API, query string) (prometheusmodel.Value, error) {
Expand All @@ -505,6 +548,36 @@ func metricQueryNow(ctx context.Context, nt *NT, v1api prometheusv1.API, query s
return response, nil
}

// metricResultMustNotExist validates that the query response includes zero
// vector or matrix results. Response is debug logged.
// This function does not perform the query, just validates and logs it.
func metricResultMustNotExist(nt *NT, query string, response prometheusmodel.Value) error {
switch result := response.(type) {
case prometheusmodel.Vector:
if len(result) > 0 {
nt.Logger.Debugf("prometheus vector response:\n%s", result)
return fmt.Errorf("unexpected results from prometheus query (found %d, expected 0): %s", len(result), query)
}
return nil
case prometheusmodel.Matrix:
if len(result) > 0 {
nt.Logger.Debugf("prometheus matrix response:\n%s", result)
return fmt.Errorf("unexpected results from prometheus query (found %d, expected 0): %s", len(result), query)
}
return nil
default:
return fmt.Errorf("unsupported prometheus response: %T", response)
}
}

func metricMustNotExist(ctx context.Context, nt *NT, v1api prometheusv1.API, query string) error {
response, err := metricQueryNow(ctx, nt, v1api, query)
if err != nil {
return err
}
return metricResultMustNotExist(nt, query, response)
}

// metricResultMustExist validates that the query response includes at least one
// vector or matrix result. Response is debug logged.
// This function does not perform the query, just validates and logs it.
Expand Down
48 changes: 34 additions & 14 deletions e2e/testcases/managed_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ import (

// This file includes tests for drift correction and drift prevention.
//
// The drift correction in the mono-repo mode utilizes the following two annotations:
// * configmanagement.gke.io/managed
// * configsync.gke.io/resource-id
// The drift correction in the mono-repo mode utilizes the following metadata:
// * the configmanagement.gke.io/managed annotation
// * the configsync.gke.io/resource-id annotation
//
// The drift correction in the multi-repo mode utilizes the following three annotations:
// * configmanagement.gke.io/managed
// * configsync.gke.io/resource-id
// * configsync.gke.io/manager
// The drift correction in the multi-repo mode utilizes the following metadata:
// * the configmanagement.gke.io/managed annotation
// * the configsync.gke.io/resource-id annotation
// * the configsync.gke.io/manager annotation
// * the config.k8s.io/owning-inventory label
//
// The drift prevention is only supported in the multi-repo mode, and utilizes the following Config Sync metadata:
// The drift prevention is only supported in the multi-repo mode, and utilizes the following metadata:
// * the configmanagement.gke.io/managed annotation
// * the configsync.gke.io/resource-id annotation
// * the configsync.gke.io/declared-version label
// * the config.k8s.io/owning-inventory label

// The reason we have both TestKubectlCreatesManagedNamespaceResourceMonoRepo and
// TestKubectlCreatesManagedNamespaceResourceMultiRepo is that the mono-repo mode and
Expand Down Expand Up @@ -79,6 +81,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _namespace_test-ns1
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
`)

if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns1.yaml"), ns, 0644); err != nil {
Expand Down Expand Up @@ -106,7 +110,9 @@ metadata:
annotations:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _namespace_test-ns2
configsync.gke.io/manager: :abcdef
configsync.gke.io/manager: config-management-system_abcdef
labels:
configsync.gke.io/parent-package-id: abcdef.config-management-system.RootSync
`)

if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns2.yaml"), ns, 0644); err != nil {
Expand All @@ -121,8 +127,10 @@ metadata:
// Wait 5 seconds so that the remediator can process the event.
time.Sleep(5 * time.Second)

// The `configsync.gke.io/manager` annotation of `test-ns2` suggests that its manager is ':abcdef'.
// The root reconciler does not manage `test-ns2`, therefore should not remove `test-ns2`.
// The `configsync.gke.io/manager` annotation of `test-ns2` suggests that
// its manager is 'config-management-system_abcdef' (RootSync named 'abcdef').
// The root reconciler (RootSync named 'root-sync') does not manage
// `test-ns2`, therefore should not remove `test-ns2`.
err = nt.Validate("test-ns2", "", &corev1.Namespace{},
testpredicates.HasExactlyAnnotationKeys(
metadata.ResourceManagementKey,
Expand Down Expand Up @@ -233,6 +241,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-cm1
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand Down Expand Up @@ -263,6 +273,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_wrong-cm2
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand Down Expand Up @@ -301,7 +313,9 @@ metadata:
annotations:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-cm3
configsync.gke.io/manager: :abcdef
configsync.gke.io/manager: config-management-system_abcdef
labels:
configsync.gke.io/parent-package-id: abcdef.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand All @@ -318,8 +332,10 @@ data:
// Wait 5 seconds so that the reconciler can process the event.
time.Sleep(5 * time.Second)

// The `configsync.gke.io/manager` annotation of `test-ns3` suggests that its manager is ':abcdef'.
// The root reconciler does not manage `test-ns3`, therefore should not remove `test-ns3`.
// The `configsync.gke.io/manager` annotation of `test-ns3` suggests that
// its manager is 'config-management-system_abcdef' (RootSync named 'abcdef').
// The root reconciler (RootSync named 'root-sync') does not manage `test-ns3`,
// therefore should not remove `test-ns3`.
err = nt.Validate("test-cm3", "bookstore", &corev1.ConfigMap{},
testpredicates.HasExactlyAnnotationKeys(
metadata.ResourceManagementKey,
Expand All @@ -340,6 +356,8 @@ metadata:
annotations:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-cm4
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand Down Expand Up @@ -378,6 +396,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-secret
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
`)

if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-secret.yaml"), cm, 0644); err != nil {
Expand Down
Loading

0 comments on commit b404034

Please sign in to comment.