Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prototype autoscaling by object count #1187

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2/textlogger"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/applier"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/importer/filesystem"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
Expand Down Expand Up @@ -180,7 +181,6 @@ func main() {
ReconcilerScope: declared.Scope(*scope),
ResyncPeriod: *resyncPeriod,
PollingPeriod: *pollingPeriod,
RetryPeriod: configsync.DefaultReconcilerRetryPeriod,
StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
Expand All @@ -193,7 +193,7 @@ func main() {
SyncDir: relSyncDir,
SyncName: *syncName,
ReconcilerName: *reconcilerName,
StatusMode: *statusMode,
StatusMode: applier.InventoryStatusMode(*statusMode),
ReconcileTimeout: *reconcileTimeout,
APIServerTimeout: *apiServerTimeout,
RenderingEnabled: *renderingEnabled,
Expand Down
79 changes: 76 additions & 3 deletions e2e/nomostest/prometheus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,12 @@ func reconcilerOperationMetrics(nt *NT, syncLabels prometheusmodel.LabelSet, op
return func(ctx context.Context, v1api prometheusv1.API) error {
var err error
err = multierr.Append(err, metricAPICallDurationViewOperationHasStatus(ctx, nt, v1api, syncLabels, string(op.Operation), ocmetrics.StatusSuccess))
err = multierr.Append(err, metricApplyOperationsViewHasValueAtLeast(ctx, nt, v1api, syncLabels, string(op.Operation), ocmetrics.StatusSuccess, op.Count))
// err = multierr.Append(err, metricAPICallDurationViewOperationMustNotHaveStatus(ctx, nt, v1api, syncLabels, string(op.Operation), ocmetrics.StatusError))
// err = multierr.Append(err, metricApplyOperationsViewMustNotHaveStatus(ctx, nt, v1api, syncLabels, ocmetrics.RemediatorController, string(op.Operation), ocmetrics.StatusError))
// err = multierr.Append(err, metricApplyOperationsViewMustNotHaveStatus(ctx, nt, v1api, syncLabels, ocmetrics.ApplierController, string(op.Operation), ocmetrics.StatusError))
err = multierr.Append(err, metricApplyOperationsViewHasValueAtLeast(ctx, nt, v1api, syncLabels, ocmetrics.ApplierController, string(op.Operation), ocmetrics.StatusSuccess, op.Count))
err = multierr.Append(err, metricRemediateDurationViewHasStatus(ctx, nt, v1api, syncLabels, ocmetrics.StatusSuccess))
// err = multierr.Append(err, metricRemediateDurationViewMustNotHaveStatus(ctx, nt, v1api, syncLabels, ocmetrics.StatusError))
return err
}
}
Expand Down Expand Up @@ -462,12 +466,25 @@ func metricAPICallDurationViewOperationHasStatus(ctx context.Context, nt *NT, v1
return metricExists(ctx, nt, v1api, query)
}

func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, operation, status string, value int) error {
func metricAPICallDurationViewOperationMustNotHaveStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, operation, status string) error {
metricName := ocmetrics.APICallDurationView.Name
// APICallDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
query := fmt.Sprintf("%s%s", metricName, labels)
return metricMustNotExist(ctx, nt, v1api, query)
}

func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, controller, operation, status string, value int) error {
metricName := ocmetrics.ApplyOperationsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(ocmetrics.ApplierController),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(controller),
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -476,6 +493,20 @@ func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api
return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value))
}

func metricApplyOperationsViewMustNotHaveStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, controller, operation, status string) error {
metricName := ocmetrics.ApplyOperationsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(controller),
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
// ApplyOperationsView is a count, so we don't need to aggregate
query := fmt.Sprintf("%s%s", metricName, labels)
return metricMustNotExist(ctx, nt, v1api, query)
}

func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, status string) error {
metricName := ocmetrics.RemediateDurationView.Name
// RemediateDurationView is a distribution. Query count to aggregate.
Expand All @@ -488,6 +519,18 @@ func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api pro
return metricExists(ctx, nt, v1api, query)
}

func metricRemediateDurationViewMustNotHaveStatus(ctx context.Context, nt *NT, v1api prometheusv1.API, syncLabels prometheusmodel.LabelSet, status string) error {
metricName := ocmetrics.RemediateDurationView.Name
// RemediateDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
query := fmt.Sprintf("%s%s", metricName, labels)
return metricMustNotExist(ctx, nt, v1api, query)
}

// metricQueryNow performs the specified query with the default timeout.
// Query is debug logged. Warnings are logged. Errors are returned.
func metricQueryNow(ctx context.Context, nt *NT, v1api prometheusv1.API, query string) (prometheusmodel.Value, error) {
Expand All @@ -505,6 +548,36 @@ func metricQueryNow(ctx context.Context, nt *NT, v1api prometheusv1.API, query s
return response, nil
}

// metricResultMustNotExist validates that the query response includes zero
// vector or matrix results. Response is debug logged.
// This function does not perform the query, just validates and logs it.
func metricResultMustNotExist(nt *NT, query string, response prometheusmodel.Value) error {
switch result := response.(type) {
case prometheusmodel.Vector:
if len(result) > 0 {
nt.Logger.Debugf("prometheus vector response:\n%s", result)
return fmt.Errorf("unexpected results from prometheus query (found %d, expected 0): %s", len(result), query)
}
return nil
case prometheusmodel.Matrix:
if len(result) > 0 {
nt.Logger.Debugf("prometheus matrix response:\n%s", result)
return fmt.Errorf("unexpected results from prometheus query (found %d, expected 0): %s", len(result), query)
}
return nil
default:
return fmt.Errorf("unsupported prometheus response: %T", response)
}
}

func metricMustNotExist(ctx context.Context, nt *NT, v1api prometheusv1.API, query string) error {
response, err := metricQueryNow(ctx, nt, v1api, query)
if err != nil {
return err
}
return metricResultMustNotExist(nt, query, response)
}

// metricResultMustExist validates that the query response includes at least one
// vector or matrix result. Response is debug logged.
// This function does not perform the query, just validates and logs it.
Expand Down
14 changes: 5 additions & 9 deletions e2e/testcases/custom_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,12 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) {
nt.T.Fatal(err)
}

// Wait for remediator to detect the drift (managed Anvil CR was deleted)
// and record it as a conflict.
// TODO: distinguish between management conflict (spec/generation drift) and concurrent status update conflict (resource version change)
err = nomostest.ValidateMetrics(nt,
// Reverting a manual change is NOT considered a "conflict", unless the new
// owner is a different reconciler.
nt.Must(nomostest.ValidateMetrics(nt,
nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, firstCommitHash, metrics.ErrorSummary{
Conflicts: 1,
}))
if err != nil {
nt.T.Fatal(err)
}
Conflicts: 0, // from the remediator
})))

// Reset discovery client to invalidate the cached Anvil CRD
nt.RenewClient()
Expand Down
48 changes: 34 additions & 14 deletions e2e/testcases/managed_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ import (

// This file includes tests for drift correction and drift prevention.
//
// The drift correction in the mono-repo mode utilizes the following two annotations:
// * configmanagement.gke.io/managed
// * configsync.gke.io/resource-id
// The drift correction in the mono-repo mode utilizes the following metadata:
// * the configmanagement.gke.io/managed annotation
// * the configsync.gke.io/resource-id annotation
//
// The drift correction in the multi-repo mode utilizes the following three annotations:
// * configmanagement.gke.io/managed
// * configsync.gke.io/resource-id
// * configsync.gke.io/manager
// The drift correction in the multi-repo mode utilizes the following metadata:
// * the configmanagement.gke.io/managed annotation
// * the configsync.gke.io/resource-id annotation
// * the configsync.gke.io/manager annotation
// * the config.k8s.io/owning-inventory label
//
// The drift prevention is only supported in the multi-repo mode, and utilizes the following Config Sync metadata:
// The drift prevention is only supported in the multi-repo mode, and utilizes the following metadata:
// * the configmanagement.gke.io/managed annotation
// * the configsync.gke.io/resource-id annotation
// * the configsync.gke.io/declared-version label
// * the config.k8s.io/owning-inventory label

// The reason we have both TestKubectlCreatesManagedNamespaceResourceMonoRepo and
// TestKubectlCreatesManagedNamespaceResourceMultiRepo is that the mono-repo mode and
Expand Down Expand Up @@ -79,6 +81,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _namespace_test-ns1
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
`)

if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns1.yaml"), ns, 0644); err != nil {
Expand Down Expand Up @@ -106,7 +110,9 @@ metadata:
annotations:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _namespace_test-ns2
configsync.gke.io/manager: :abcdef
configsync.gke.io/manager: config-management-system_abcdef
labels:
configsync.gke.io/parent-package-id: abcdef.config-management-system.RootSync
`)

if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns2.yaml"), ns, 0644); err != nil {
Expand All @@ -121,8 +127,10 @@ metadata:
// Wait 5 seconds so that the remediator can process the event.
time.Sleep(5 * time.Second)

// The `configsync.gke.io/manager` annotation of `test-ns2` suggests that its manager is ':abcdef'.
// The root reconciler does not manage `test-ns2`, therefore should not remove `test-ns2`.
// The `configsync.gke.io/manager` annotation of `test-ns2` suggests that
// its manager is 'config-management-system_abcdef' (RootSync named 'abcdef').
// The root reconciler (RootSync named 'root-sync') does not manage
// `test-ns2`, therefore should not remove `test-ns2`.
err = nt.Validate("test-ns2", "", &corev1.Namespace{},
testpredicates.HasExactlyAnnotationKeys(
metadata.ResourceManagementKey,
Expand Down Expand Up @@ -233,6 +241,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-cm1
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand Down Expand Up @@ -263,6 +273,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_wrong-cm2
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand Down Expand Up @@ -301,7 +313,9 @@ metadata:
annotations:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-cm3
configsync.gke.io/manager: :abcdef
configsync.gke.io/manager: config-management-system_abcdef
labels:
configsync.gke.io/parent-package-id: abcdef.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand All @@ -318,8 +332,10 @@ data:
// Wait 5 seconds so that the reconciler can process the event.
time.Sleep(5 * time.Second)

// The `configsync.gke.io/manager` annotation of `test-ns3` suggests that its manager is ':abcdef'.
// The root reconciler does not manage `test-ns3`, therefore should not remove `test-ns3`.
// The `configsync.gke.io/manager` annotation of `test-ns3` suggests that
// its manager is 'config-management-system_abcdef' (RootSync named 'abcdef').
// The root reconciler (RootSync named 'root-sync') does not manage `test-ns3`,
// therefore should not remove `test-ns3`.
err = nt.Validate("test-cm3", "bookstore", &corev1.ConfigMap{},
testpredicates.HasExactlyAnnotationKeys(
metadata.ResourceManagementKey,
Expand All @@ -340,6 +356,8 @@ metadata:
annotations:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-cm4
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
data:
weekday: "monday"
`)
Expand Down Expand Up @@ -378,6 +396,8 @@ metadata:
configmanagement.gke.io/managed: enabled
configsync.gke.io/resource-id: _configmap_bookstore_test-secret
configsync.gke.io/manager: :root
labels:
configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync
`)

if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-secret.yaml"), cm, 0644); err != nil {
Expand Down
Loading