From 721a7e117dcf5966a57d456f1351a5db22296aec Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Tue, 2 Jul 2024 12:24:24 -0700 Subject: [PATCH] Add watch filtering by package-id label - Update k8s & cli-utils & controller-runtime - Fix Fprintf calls to handle errors --- .golangci.yaml | 2 +- Makefile | 2 +- cmd/nomos/status/cluster_state.go | 34 ++--- cmd/nomos/util/util.go | 14 +- cmd/nomos/version/version.go | 8 +- e2e/testcases/custom_resources_test.go | 14 +- e2e/testcases/managed_resources_test.go | 46 ++++-- e2e/testcases/multi_sync_test.go | 18 +-- e2e/testcases/preserve_fields_test.go | 26 ++-- go.mod | 2 +- go.sum | 8 +- pkg/applier/applier_test.go | 9 +- pkg/applier/clientset.go | 16 ++- pkg/applier/status.go | 48 +++++-- pkg/metadata/labels.go | 6 + pkg/metadata/metadata_test.go | 6 + pkg/metadata/package_id.go | 44 ++++++ pkg/metadata/package_id_test.go | 125 ++++++++++++++++ pkg/parse/annotations.go | 2 + pkg/parse/annotations_test.go | 12 +- pkg/parse/root_test.go | 43 +++++- pkg/reconciler/reconciler.go | 4 +- .../controllers/reconciler_base.go | 25 ++++ .../controllers/reposync_controller.go | 6 +- .../controllers/rootsync_controller.go | 6 +- pkg/remediator/watch/filteredwatcher_test.go | 2 + pkg/remediator/watch/manager.go | 13 ++ pkg/remediator/watch/manager_test.go | 2 + pkg/remediator/watch/watcher.go | 5 + pkg/util/env.go | 13 +- vendor/modules.txt | 4 +- .../cli-utils/pkg/apply/applier_builder.go | 5 + .../cli-utils/pkg/apply/builder.go | 11 +- .../cli-utils/pkg/apply/destroyer_builder.go | 5 + .../pkg/apply/mutator/apply_time_mutator.go | 2 +- .../cli-utils/pkg/apply/task/apply_task.go | 4 +- .../cli-utils/pkg/jsonpath/jsonpath.go | 11 +- .../kstatus/watcher/default_status_watcher.go | 30 +++- .../watcher/dynamic_informer_factory.go | 135 +++++++++++++++--- .../kstatus/watcher/object_status_reporter.go | 2 +- 40 files changed, 636 insertions(+), 134 deletions(-) create mode 100644 pkg/metadata/package_id.go create mode 100644 pkg/metadata/package_id_test.go diff --git a/.golangci.yaml b/.golangci.yaml index d8d8dd6cc5..5331bd9953 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -26,7 +26,7 @@ linters: linters-settings: staticcheck: - go: "1.21" + go: "1.22" issues: exclude-rules: diff --git a/Makefile b/Makefile index 788df03677..a54353ed72 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ GO_LICENSES := $(BIN_DIR)/go-licenses ADDLICENSE_VERSION := v1.1.1 ADDLICENSE := $(BIN_DIR)/addlicense -GOLANGCI_LINT_VERSION := v1.56.2 +GOLANGCI_LINT_VERSION := v1.59.1 GOLANGCI_LINT := $(BIN_DIR)/golangci-lint KUSTOMIZE_VERSION := v5.4.2-gke.0 diff --git a/cmd/nomos/status/cluster_state.go b/cmd/nomos/status/cluster_state.go index 70bf2551ae..d5105b3689 100644 --- a/cmd/nomos/status/cluster_state.go +++ b/cmd/nomos/status/cluster_state.go @@ -48,15 +48,15 @@ type ClusterState struct { } func (c *ClusterState) printRows(writer io.Writer) { - fmt.Fprintln(writer, "") - fmt.Fprintf(writer, "%s\n", c.Ref) + util.MustFprintf(writer, "\n") + util.MustFprintf(writer, "%s\n", c.Ref) if c.status != "" || c.Error != "" { - fmt.Fprintf(writer, "%s%s\n", util.Indent, util.Separator) - fmt.Fprintf(writer, "%s%s\t%s\n", util.Indent, c.status, c.Error) + util.MustFprintf(writer, "%s%s\n", util.Indent, util.Separator) + util.MustFprintf(writer, "%s%s\t%s\n", util.Indent, c.status, c.Error) } for _, repo := range c.repos { if name == "" || name == repo.syncName { - fmt.Fprintf(writer, "%s%s\n", util.Indent, util.Separator) + util.MustFprintf(writer, "%s%s\n", util.Indent, util.Separator) repo.printRows(writer) } } @@ -90,44 +90,44 @@ type RepoState struct { } func (r *RepoState) printRows(writer io.Writer) { - fmt.Fprintf(writer, "%s%s:%s\t%s\t\n", util.Indent, r.scope, r.syncName, sourceString(r.sourceType, r.git, r.oci, r.helm)) + util.MustFprintf(writer, "%s%s:%s\t%s\t\n", util.Indent, r.scope, r.syncName, sourceString(r.sourceType, r.git, r.oci, r.helm)) if r.status == syncedMsg { - fmt.Fprintf(writer, "%s%s @ %v\t%s\t\n", util.Indent, r.status, r.lastSyncTimestamp, r.commit) + util.MustFprintf(writer, "%s%s @ %v\t%s\t\n", util.Indent, r.status, r.lastSyncTimestamp, r.commit) } else { - fmt.Fprintf(writer, "%s%s\t%s\t\n", util.Indent, r.status, r.commit) + util.MustFprintf(writer, "%s%s\t%s\t\n", util.Indent, r.status, r.commit) } if r.errorSummary != nil && r.errorSummary.TotalCount > 0 { if r.errorSummary.Truncated { - fmt.Fprintf(writer, "%sTotalErrorCount: %d, ErrorTruncated: %v, ErrorCountAfterTruncation: %d\n", util.Indent, + util.MustFprintf(writer, "%sTotalErrorCount: %d, ErrorTruncated: %v, ErrorCountAfterTruncation: %d\n", util.Indent, r.errorSummary.TotalCount, r.errorSummary.Truncated, r.errorSummary.ErrorCountAfterTruncation) } else { - fmt.Fprintf(writer, "%sTotalErrorCount: %d\n", util.Indent, r.errorSummary.TotalCount) + util.MustFprintf(writer, "%sTotalErrorCount: %d\n", util.Indent, r.errorSummary.TotalCount) } } for _, err := range r.errors { - fmt.Fprintf(writer, "%sError:\t%s\t\n", util.Indent, err) + util.MustFprintf(writer, "%sError:\t%s\t\n", util.Indent, err) } if resourceStatus && len(r.resources) > 0 { sort.Sort(byNamespaceAndType(r.resources)) - fmt.Fprintf(writer, "%sManaged resources:\n", util.Indent) + util.MustFprintf(writer, "%sManaged resources:\n", util.Indent) hasSourceHash := r.resources[0].SourceHash != "" if !hasSourceHash { - fmt.Fprintf(writer, "%s\tNAMESPACE\tNAME\tSTATUS\n", util.Indent) + util.MustFprintf(writer, "%s\tNAMESPACE\tNAME\tSTATUS\n", util.Indent) } else { - fmt.Fprintf(writer, "%s\tNAMESPACE\tNAME\tSTATUS\tSOURCEHASH\n", util.Indent) + util.MustFprintf(writer, "%s\tNAMESPACE\tNAME\tSTATUS\tSOURCEHASH\n", util.Indent) } for _, r := range r.resources { if !hasSourceHash { - fmt.Fprintf(writer, "%s\t%s\t%s\t%s\n", util.Indent, r.Namespace, r.String(), r.Status) + util.MustFprintf(writer, "%s\t%s\t%s\t%s\n", util.Indent, r.Namespace, r.String(), r.Status) } else { - fmt.Fprintf(writer, "%s\t%s\t%s\t%s\t%s\n", util.Indent, r.Namespace, r.String(), r.Status, r.SourceHash) + util.MustFprintf(writer, "%s\t%s\t%s\t%s\t%s\n", util.Indent, r.Namespace, r.String(), r.Status, r.SourceHash) } if len(r.Conditions) > 0 { for _, condition := range r.Conditions { - fmt.Fprintf(writer, "%s%s%s%s%s\n", util.Indent, util.Indent, util.Indent, util.Indent, condition.Message) + util.MustFprintf(writer, "%s%s%s%s%s\n", util.Indent, util.Indent, util.Indent, util.Indent, condition.Message) } } } diff --git a/cmd/nomos/util/util.go b/cmd/nomos/util/util.go index 5507921e55..fb7843f8f5 100644 --- a/cmd/nomos/util/util.go +++ b/cmd/nomos/util/util.go @@ -45,12 +45,20 @@ func MonoRepoNotice(writer io.Writer, monoRepoClusters ...string) { clusterCount := len(monoRepoClusters) if clusterCount != 0 { if clusterCount == 1 { - fmt.Fprintf(writer, "%sNotice: The cluster %q is still running in the legacy mode.\n", + MustFprintf(writer, "%sNotice: The cluster %q is still running in the legacy mode.\n", ColorYellow, monoRepoClusters[0]) } else { - fmt.Fprintf(writer, "%sNotice: The following clusters are still running in the legacy mode:\n%s%s\n", + MustFprintf(writer, "%sNotice: The following clusters are still running in the legacy mode:\n%s%s\n", ColorYellow, Bullet, strings.Join(monoRepoClusters, "\n"+Bullet)) } - fmt.Fprintf(writer, "Run `nomos migrate` to enable multi-repo mode. It provides you with additional features and gives you the flexibility to sync to a single repository, or multiple repositories.%s\n", ColorDefault) + MustFprintf(writer, "Run `nomos migrate` to enable multi-repo mode. It provides you with additional features and gives you the flexibility to sync to a single repository, or multiple repositories.%s\n", ColorDefault) + } +} + +// MustFprintf prints to the writer and panics if it errors. +func MustFprintf(writer io.Writer, format string, a ...any) { + _, err := fmt.Fprintf(writer, format, a...) + if err != nil { + panic(fmt.Sprintf("Error writing output: %v", err)) } } diff --git a/cmd/nomos/version/version.go b/cmd/nomos/version/version.go index de032b8359..a912247402 100644 --- a/cmd/nomos/version/version.go +++ b/cmd/nomos/version/version.go @@ -284,15 +284,15 @@ func tabulate(es []entry, out io.Writer) { w := util.NewWriter(out) defer func() { if err := w.Flush(); err != nil { - fmt.Fprintf(os.Stderr, "error on Flush(): %v", err) + util.MustFprintf(os.Stderr, "error on Flush(): %v", err) } }() - fmt.Fprintf(w, format, "CURRENT", "CLUSTER_CONTEXT_NAME", "COMPONENT", "VERSION") + util.MustFprintf(w, format, "CURRENT", "CLUSTER_CONTEXT_NAME", "COMPONENT", "VERSION") for _, e := range es { if e.err != nil { - fmt.Fprintf(w, format, e.current, e.name, e.component, fmt.Sprintf("", e.err)) + util.MustFprintf(w, format, e.current, e.name, e.component, fmt.Sprintf("", e.err)) continue } - fmt.Fprintf(w, format, e.current, e.name, e.component, e.version) + util.MustFprintf(w, format, e.current, e.name, e.component, e.version) } } diff --git a/e2e/testcases/custom_resources_test.go b/e2e/testcases/custom_resources_test.go index 66444db2b9..02c9f5fee4 100644 --- a/e2e/testcases/custom_resources_test.go +++ b/e2e/testcases/custom_resources_test.go @@ -109,16 +109,12 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) { nt.T.Fatal(err) } - // Wait for remediator to detect the drift (managed Anvil CR was deleted) - // and record it as a conflict. - // TODO: distinguish between management conflict (spec/generation drift) and concurrent status update conflict (resource version change) - err = nomostest.ValidateMetrics(nt, + // Reverting a manual change is NOT considered a "conflict", unless the new + // owner is a different reconciler. + nt.Must(nomostest.ValidateMetrics(nt, nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, firstCommitHash, metrics.ErrorSummary{ - Conflicts: 1, - })) - if err != nil { - nt.T.Fatal(err) - } + Conflicts: 0, // from the remediator + }))) // Reset discovery client to invalidate the cached Anvil CRD nt.RenewClient() diff --git a/e2e/testcases/managed_resources_test.go b/e2e/testcases/managed_resources_test.go index f8a18d9a83..c5ba2629b5 100644 --- a/e2e/testcases/managed_resources_test.go +++ b/e2e/testcases/managed_resources_test.go @@ -37,19 +37,21 @@ import ( // This file includes tests for drift correction and drift prevention. // -// The drift correction in the mono-repo mode utilizes the following two annotations: -// * configmanagement.gke.io/managed -// * configsync.gke.io/resource-id +// The drift correction in the mono-repo mode utilizes the following metadata: +// * the configmanagement.gke.io/managed annotation +// * the configsync.gke.io/resource-id annotation // -// The drift correction in the multi-repo mode utilizes the following three annotations: -// * configmanagement.gke.io/managed -// * configsync.gke.io/resource-id -// * configsync.gke.io/manager +// The drift correction in the multi-repo mode utilizes the following metadata: +// * the configmanagement.gke.io/managed annotation +// * the configsync.gke.io/resource-id annotation +// * the configsync.gke.io/manager annotation +// * the config.k8s.io/owning-inventory label // // The drift prevention is only supported in the multi-repo mode, and utilizes the following Config Sync metadata: // * the configmanagement.gke.io/managed annotation // * the configsync.gke.io/resource-id annotation // * the configsync.gke.io/declared-version label +// * the config.k8s.io/owning-inventory label // The reason we have both TestKubectlCreatesManagedNamespaceResourceMonoRepo and // TestKubectlCreatesManagedNamespaceResourceMultiRepo is that the mono-repo mode and @@ -79,6 +81,8 @@ metadata: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _namespace_test-ns1 configsync.gke.io/manager: :root + labels: + configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync `) if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns1.yaml"), ns, 0644); err != nil { @@ -106,7 +110,9 @@ metadata: annotations: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _namespace_test-ns2 - configsync.gke.io/manager: :abcdef + configsync.gke.io/manager: config-management-system_abcdef + labels: + configsync.gke.io/parent-package-id: abcdef.config-management-system.RootSync `) if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns2.yaml"), ns, 0644); err != nil { @@ -121,8 +127,10 @@ metadata: // Wait 5 seconds so that the remediator can process the event. time.Sleep(5 * time.Second) - // The `configsync.gke.io/manager` annotation of `test-ns2` suggests that its manager is ':abcdef'. - // The root reconciler does not manage `test-ns2`, therefore should not remove `test-ns2`. + // The `configsync.gke.io/manager` annotation of `test-ns2` suggests that + // its manager is 'config-management-system_abcdef' (RootSync named 'abcdef'). + // The root reconciler (RootSync named 'root-sync') does not manage + // `test-ns2`, therefore should not remove `test-ns2`. err = nt.Validate("test-ns2", "", &corev1.Namespace{}, testpredicates.HasExactlyAnnotationKeys( metadata.ResourceManagementKey, @@ -233,6 +241,8 @@ metadata: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _configmap_bookstore_test-cm1 configsync.gke.io/manager: :root + labels: + configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync data: weekday: "monday" `) @@ -263,6 +273,8 @@ metadata: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _configmap_bookstore_wrong-cm2 configsync.gke.io/manager: :root + labels: + configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync data: weekday: "monday" `) @@ -301,7 +313,9 @@ metadata: annotations: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _configmap_bookstore_test-cm3 - configsync.gke.io/manager: :abcdef + configsync.gke.io/manager: config-management-system_abcdef + labels: + configsync.gke.io/parent-package-id: abcdef.config-management-system.RootSync data: weekday: "monday" `) @@ -318,8 +332,10 @@ data: // Wait 5 seconds so that the reconciler can process the event. time.Sleep(5 * time.Second) - // The `configsync.gke.io/manager` annotation of `test-ns3` suggests that its manager is ':abcdef'. - // The root reconciler does not manage `test-ns3`, therefore should not remove `test-ns3`. + // The `configsync.gke.io/manager` annotation of `test-ns3` suggests that + // its manager is 'config-management-system_abcdef' (RootSync named 'abcdef'). + // The root reconciler (RootSync named 'root-sync') does not manage `test-ns3`, + // therefore should not remove `test-ns3`. err = nt.Validate("test-cm3", "bookstore", &corev1.ConfigMap{}, testpredicates.HasExactlyAnnotationKeys( metadata.ResourceManagementKey, @@ -340,6 +356,8 @@ metadata: annotations: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _configmap_bookstore_test-cm4 + labels: + configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync data: weekday: "monday" `) @@ -378,6 +396,8 @@ metadata: configmanagement.gke.io/managed: enabled configsync.gke.io/resource-id: _configmap_bookstore_test-secret configsync.gke.io/manager: :root + labels: + configsync.gke.io/parent-package-id: root-sync.config-management-system.RootSync `) if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-secret.yaml"), cm, 0644); err != nil { diff --git a/e2e/testcases/multi_sync_test.go b/e2e/testcases/multi_sync_test.go index 362700ad73..5cb8b7a797 100644 --- a/e2e/testcases/multi_sync_test.go +++ b/e2e/testcases/multi_sync_test.go @@ -453,20 +453,20 @@ func TestConflictingDefinitions_NamespaceToRoot(t *testing.T) { } // Validate reconciler error metric is emitted from namespace reconciler. - rootSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN) + repoSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN) if err != nil { nt.T.Fatal(err) } commitHash := nt.NonRootRepos[repoSyncNN].MustHash(nt.T) - err = nomostest.ValidateMetrics(nt, - nomostest.ReconcilerSyncError(nt, rootSyncLabels, commitHash), - nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, commitHash, metrics.ErrorSummary{ - Sync: 1, - })) - if err != nil { - nt.T.Fatal(err) - } + // Reverting a manual change IS considered a "conflict" when the new + // owner is a different reconciler. + nt.Must(nomostest.ValidateMetrics(nt, + nomostest.ReconcilerSyncError(nt, repoSyncLabels, commitHash), + nomostest.ReconcilerErrorMetrics(nt, repoSyncLabels, commitHash, metrics.ErrorSummary{ + Conflicts: 1, // from the remediator + Sync: 1, // remediator conflict error replaces applier conflict error + }))) nt.T.Logf("Ensure the Role matches the one in the Root repo %s", configsync.RootSyncName) err = nt.Validate("pods", testNs, &rbacv1.Role{}, diff --git a/e2e/testcases/preserve_fields_test.go b/e2e/testcases/preserve_fields_test.go index 30c8ca13c7..48ff5ba92f 100644 --- a/e2e/testcases/preserve_fields_test.go +++ b/e2e/testcases/preserve_fields_test.go @@ -333,7 +333,11 @@ func TestAddUpdateDeleteLabels(t *testing.T) { nt.T.Fatal(err) } - var defaultLabels = []string{metadata.ManagedByKey, metadata.DeclaredVersionLabel} + var defaultLabels = []string{ + metadata.ManagedByKey, + metadata.ParentPackageIDLabel, + metadata.DeclaredVersionLabel, + } // Checking that the configmap with no labels appears on cluster, and // that no user labels are specified @@ -350,12 +354,13 @@ func TestAddUpdateDeleteLabels(t *testing.T) { nt.T.Fatal(err) } - // Checking that label is updated after syncing an update. - err = nt.Validate(cmName, ns, &corev1.ConfigMap{}, - testpredicates.HasExactlyLabelKeys(append(defaultLabels, "baz")...)) - if err != nil { - nt.T.Fatal(err) - } + var updatedLabels []string + updatedLabels = append(updatedLabels, defaultLabels...) + updatedLabels = append(updatedLabels, "baz") + + // Checking that label was added after syncing. + nt.Must(nt.Validate(cmName, ns, &corev1.ConfigMap{}, + testpredicates.HasExactlyLabelKeys(updatedLabels...))) delete(cm.Labels, "baz") nt.Must(nt.RootRepos[configsync.RootSyncName].Add(cmPath, cm)) @@ -365,11 +370,8 @@ func TestAddUpdateDeleteLabels(t *testing.T) { } // Check that the label is deleted after syncing. - err = nt.Validate(cmName, ns, &corev1.ConfigMap{}, - testpredicates.HasExactlyLabelKeys(metadata.ManagedByKey, metadata.DeclaredVersionLabel)) - if err != nil { - nt.T.Fatal(err) - } + nt.Must(nt.Validate(cmName, ns, &corev1.ConfigMap{}, + testpredicates.HasExactlyLabelKeys(defaultLabels...))) nt.MetricsExpectations.AddObjectApply(configsync.RootSyncKind, rootSyncNN, nsObj) nt.MetricsExpectations.AddObjectApply(configsync.RootSyncKind, rootSyncNN, cm) diff --git a/go.mod b/go.mod index 365d5d22c5..5a18c80884 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( k8s.io/kubectl v0.30.1 k8s.io/kubernetes v1.30.1 k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 - sigs.k8s.io/cli-utils v0.35.1-0.20240504222723-227a03f4a7f9 + sigs.k8s.io/cli-utils v0.37.2 sigs.k8s.io/controller-runtime v0.18.4 sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20231023142458-b9f29826ee83 sigs.k8s.io/kind v0.20.0 diff --git a/go.sum b/go.sum index 8f99311102..2e9b9c36d0 100644 --- a/go.sum +++ b/go.sum @@ -402,8 +402,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g= -github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/open-policy-agent/cert-controller v0.10.2-0.20240531181455-2649f121ab97 h1:otk1yQtHleQq5VIy5C+rAYnErOT/WrvDyplqPhYDqb8= @@ -1093,8 +1093,8 @@ k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/cli-utils v0.35.1-0.20240504222723-227a03f4a7f9 h1:fe6u8xC3vudos+7qXxaxEbB6xS0MLjv4tZN3t1sAWIo= -sigs.k8s.io/cli-utils v0.35.1-0.20240504222723-227a03f4a7f9/go.mod h1:uCFC3BPXB3xHFQyKkWUlTrncVDCKzbdDfqZqRTCrk24= +sigs.k8s.io/cli-utils v0.37.2 h1:GOfKw5RV2HDQZDJlru5KkfLO1tbxqMoyn1IYUxqBpNg= +sigs.k8s.io/cli-utils v0.37.2/go.mod h1:V+IZZr4UoGj7gMJXklWBg6t5xbdThFBcpj4MrZuCYco= sigs.k8s.io/controller-runtime v0.18.4 h1:87+guW1zhvuPLh1PHybKdYFLU0YJp4FhJRmiHvm5BZw= sigs.k8s.io/controller-runtime v0.18.4/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20231023142458-b9f29826ee83 h1:37G7lMdeXe0kogUnwCa1K+EIVYR5f4BqM0bqITyaDBI= diff --git a/pkg/applier/applier_test.go b/pkg/applier/applier_test.go index 209cc1a465..0e59343fc3 100644 --- a/pkg/applier/applier_test.go +++ b/pkg/applier/applier_test.go @@ -98,10 +98,11 @@ func TestApply(t *testing.T) { "example-to-not-delete": "anything", }) abandonObj.SetLabels(map[string]string{ - metadata.ManagedByKey: metadata.ManagedByValue, - metadata.SystemLabel: "anything", - metadata.ArchLabel: "anything", - "example-to-not-delete": "anything", + metadata.ManagedByKey: metadata.ManagedByValue, + metadata.ParentPackageIDLabel: "anything", + metadata.SystemLabel: "anything", + metadata.ArchLabel: "anything", + "example-to-not-delete": "anything", }) abandonObjID := core.IDOf(abandonObj) diff --git a/pkg/applier/clientset.go b/pkg/applier/clientset.go index f46194c055..d69336bdbc 100644 --- a/pkg/applier/clientset.go +++ b/pkg/applier/clientset.go @@ -19,12 +19,15 @@ import ( "github.com/GoogleContainerTools/kpt/pkg/live" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/klog/v2" "k8s.io/kubectl/pkg/cmd/util" + "kpt.dev/configsync/pkg/metadata" "sigs.k8s.io/cli-utils/pkg/apply" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/inventory" + "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -53,7 +56,7 @@ type ClientSet struct { } // NewClientSet constructs a new ClientSet. -func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, statusMode string) (*ClientSet, error) { +func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, statusMode, packageID string) (*ClientSet, error) { matchVersionKubeConfigFlags := util.NewMatchVersionFlags(configFlags) f := util.NewFactory(matchVersionKubeConfigFlags) @@ -71,9 +74,19 @@ func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, s return nil, err } + // Only watch objects applied by this reconciler for status updates. + // This reduces both the number of events processed and the memory used by + // the informer cache. + watchFilters := &watcher.Filters{ + Labels: labels.Set{ + metadata.ParentPackageIDLabel: packageID, + }.AsSelector(), + } + applier, err := apply.NewApplierBuilder(). WithInventoryClient(invClient). WithFactory(f). + WithStatusWatcherFilters(watchFilters). Build() if err != nil { return nil, err @@ -82,6 +95,7 @@ func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, s destroyer, err := apply.NewDestroyerBuilder(). WithInventoryClient(invClient). WithFactory(f). + WithStatusWatcherFilters(watchFilters). Build() if err != nil { return nil, err diff --git a/pkg/applier/status.go b/pkg/applier/status.go index 006c926a37..8995f98fdc 100644 --- a/pkg/applier/status.go +++ b/pkg/applier/status.go @@ -57,11 +57,17 @@ func (m ObjectStatusMap) Log(logger infofLogger) { var b strings.Builder for i, status := range actuationStatuses { if i > 0 { - b.WriteString(commaEscapedNewlineDelimiter) + if _, err := b.WriteString(commaEscapedNewlineDelimiter); err != nil { + logger.Infof("Error: Failed to write delimiter to string builder: %v", err) + return + } } ids := m.Filter(actuation.ActuationStrategyApply, status, -1) count += len(ids) - writeStatus(&b, status, ids) + if err := writeStatus(&b, status, ids); err != nil { + logger.Infof("Error: Failed to write actuation status to string builder: %v", err) + return + } } if count == 0 { logger.Infof("Apply Actuations (Total: %d)", count) @@ -73,11 +79,17 @@ func (m ObjectStatusMap) Log(logger infofLogger) { b.Reset() for i, status := range reconcileStatuses { if i > 0 { - b.WriteString(commaEscapedNewlineDelimiter) + if _, err := b.WriteString(commaEscapedNewlineDelimiter); err != nil { + logger.Infof("Error: Failed to write delimiter to string builder: %v", err) + return + } } ids := m.Filter(actuation.ActuationStrategyApply, -1, status) count += len(ids) - writeStatus(&b, status, ids) + if err := writeStatus(&b, status, ids); err != nil { + logger.Infof("Error: Failed to write reconcile status to string builder: %v", err) + return + } } if count == 0 { logger.Infof("Apply Reconciles (Total: %d)", count) @@ -89,11 +101,17 @@ func (m ObjectStatusMap) Log(logger infofLogger) { b.Reset() for i, status := range actuationStatuses { if i > 0 { - b.WriteString(commaEscapedNewlineDelimiter) + if _, err := b.WriteString(commaEscapedNewlineDelimiter); err != nil { + logger.Infof("Error: Failed to write delimiter to string builder: %v", err) + return + } } ids := m.Filter(actuation.ActuationStrategyDelete, status, -1) count += len(ids) - writeStatus(&b, status, ids) + if err := writeStatus(&b, status, ids); err != nil { + logger.Infof("Error: Failed to write actuation status to string builder: %v", err) + return + } } if count == 0 { logger.Infof("Delete Actuations (Total: %d)", count) @@ -105,11 +123,17 @@ func (m ObjectStatusMap) Log(logger infofLogger) { b.Reset() for i, status := range reconcileStatuses { if i > 0 { - b.WriteString(commaEscapedNewlineDelimiter) + if _, err := b.WriteString(commaEscapedNewlineDelimiter); err != nil { + logger.Infof("Error: Failed to write delimiter to string builder: %v", err) + return + } } ids := m.Filter(actuation.ActuationStrategyDelete, -1, status) count += len(ids) - writeStatus(&b, status, ids) + if err := writeStatus(&b, status, ids); err != nil { + logger.Infof("Error: Failed to write reconcile status to string builder: %v", err) + return + } } if count == 0 { logger.Infof("Delete Reconciles (Total: %d)", count) @@ -118,12 +142,14 @@ func (m ObjectStatusMap) Log(logger infofLogger) { } } -func writeStatus(w io.Writer, status interface{ String() string }, ids []core.ID) { +func writeStatus(w io.Writer, status interface{ String() string }, ids []core.ID) error { + var err error if len(ids) == 0 { - fmt.Fprintf(w, "%s (%d)", status, len(ids)) + _, err = fmt.Fprintf(w, "%s (%d)", status, len(ids)) } else { - fmt.Fprintf(w, "%s (%d): [%s]", status, len(ids), joinIDs(commaSpaceDelimiter, ids...)) + _, err = fmt.Fprintf(w, "%s (%d): [%s]", status, len(ids), joinIDs(commaSpaceDelimiter, ids...)) } + return err } // Filter returns an unsorted list of IDs that satisfy the specified constraints. diff --git a/pkg/metadata/labels.go b/pkg/metadata/labels.go index b2c3a2144a..1418f81b0b 100644 --- a/pkg/metadata/labels.go +++ b/pkg/metadata/labels.go @@ -61,6 +61,12 @@ const ( // the resource. Similar to the well known app.kubernetes.io/managed-by label, // but scoped to Config Sync. ConfigSyncManagedByLabel = configsync.ConfigSyncPrefix + "managed-by" + + // ParentPackageIDLabel indicates the parent package a managed object was sourced from. + ParentPackageIDLabel = configsync.ConfigSyncPrefix + "parent-package-id" + + // PackageIDLabel indicates the ID of the package represented by a RootSync or RepoSync. + PackageIDLabel = configsync.ConfigSyncPrefix + "package-id" ) // DepthSuffix is a label suffix for hierarchical namespace depth. diff --git a/pkg/metadata/metadata_test.go b/pkg/metadata/metadata_test.go index 667f730de1..dbf7448188 100644 --- a/pkg/metadata/metadata_test.go +++ b/pkg/metadata/metadata_test.go @@ -105,6 +105,12 @@ func TestHasConfigSyncMetadata(t *testing.T) { core.Label(metadata.ManagedByKey, "random-value")), want: false, }, + { + name: "An object with the `parent-package-id` label", + obj: fake.UnstructuredObject(kinds.Deployment(), core.Name("deploy"), + core.Label(metadata.ParentPackageIDLabel, "random-value")), + want: true, + }, } for _, tc := range testcases { diff --git a/pkg/metadata/package_id.go b/pkg/metadata/package_id.go new file mode 100644 index 0000000000..c018eabc57 --- /dev/null +++ b/pkg/metadata/package_id.go @@ -0,0 +1,44 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "fmt" + "hash/fnv" + + "k8s.io/apimachinery/pkg/util/validation" +) + +const maxLabelLength = validation.LabelValueMaxLength // 63 + +// PackageID is a label value that uniquely identifies a RootSync or RepoSync. +// PACKAGE_ID = - +// PACKAGE_ID_FULL = .. +// Design: go/config-sync-watch-filter +func PackageID(syncName, syncNamespace, syncKind string) string { + packageID := fmt.Sprintf("%s.%s.%s", syncName, syncNamespace, syncKind) + if len(packageID) <= maxLabelLength { + return packageID + } + // fnv32a has slightly better avalanche characteristics than fnv32 + hasher := fnv.New32a() + hasher.Write([]byte(fmt.Sprintf("%s.%s.%s", syncName, syncNamespace, syncKind))) + // Converting 32-bit fnv to hex results in at most 8 characters. + // Rarely it's fewer, so pad the prefix with zeros to make it consistent. + suffix := fmt.Sprintf("%08x", hasher.Sum32()) + packageIDLen := maxLabelLength - len(suffix) - 1 + packageIDShort := packageID[0 : packageIDLen-1] + return fmt.Sprintf("%s-%s", packageIDShort, suffix) +} diff --git a/pkg/metadata/package_id_test.go b/pkg/metadata/package_id_test.go new file mode 100644 index 0000000000..d5d2c676df --- /dev/null +++ b/pkg/metadata/package_id_test.go @@ -0,0 +1,125 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/validation" + "kpt.dev/configsync/pkg/api/configsync" +) + +const ( + // maxLabelLength = validation.LabelValueMaxLength // 63 + maxNameLength = validation.DNS1123SubdomainMaxLength // 253 +) +const loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." + +func TestPackageIDLabelValue(t *testing.T) { + loremIpsumValid := strings.ReplaceAll(loremIpsum, " ", "") + loremIpsumValid = strings.ReplaceAll(loremIpsumValid, ",", "") + loremIpsumValid = strings.ReplaceAll(loremIpsumValid, ".", "") + loremIpsumValid = strings.ToLower(loremIpsumValid) + loremIpsumReverse := reverse(loremIpsumValid) + + type args struct { + syncName string + syncNamespace string + syncKind string + } + tests := []struct { + name string + in args + out string + }{ + { + // As long as possible + name: "253 char name & 63 char namespace", + in: args{ + syncName: loremIpsumValid[0 : maxNameLength-1], + syncNamespace: loremIpsumReverse[0 : maxLabelLength-1], + syncKind: configsync.RootSyncKind, + }, + out: "loremipsumdolorsitametconsecteturadipiscingelitseddoe-9758dfab", + }, + { + // Padded checksum + name: "63 char name & 63 char namespace", + in: args{ + syncName: loremIpsumValid[0 : maxLabelLength-1], + syncNamespace: loremIpsumReverse[0 : maxLabelLength-1], + syncKind: configsync.RootSyncKind, + }, + out: "loremipsumdolorsitametconsecteturadipiscingelitseddoe-0e8e8264", + }, + { + // Trailing dot + name: "53 char name & 63 char namespace", + in: args{ + syncName: loremIpsumValid[0:52], + syncNamespace: loremIpsumReverse[0 : maxLabelLength-1], + syncKind: configsync.RootSyncKind, + }, + out: "loremipsumdolorsitametconsecteturadipiscingelitseddo.-ea053540", + }, + { + // Just a little too long + name: "30 char name & 30 char namespace", + in: args{ + syncName: loremIpsumValid[0:29], + syncNamespace: loremIpsumReverse[0:29], + syncKind: configsync.RepoSyncKind, + }, + out: "loremipsumdolorsitametconsect.murobaltsediminatillomt-ff884db0", + }, + { + // Short enough to not hash + name: "10 char name & 10 char namespace", + in: args{ + syncName: loremIpsumValid[0:9], + syncNamespace: loremIpsumReverse[0:9], + syncKind: configsync.RepoSyncKind, + }, + out: "loremipsu.murobalts.RepoSync", + }, + { + // As short as possible + name: "1 char name & 1 char namespace", + in: args{ + syncName: loremIpsumValid[0:1], + syncNamespace: loremIpsumReverse[0:1], + syncKind: configsync.RepoSyncKind, + }, + out: "l.m.RepoSync", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := PackageID(tt.in.syncName, tt.in.syncNamespace, tt.in.syncKind) + assert.Equal(t, tt.out, out) + assert.LessOrEqual(t, len(out), 63) + }) + } +} + +func reverse(s string) string { + runes := []rune(s) + for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { + runes[i], runes[j] = runes[j], runes[i] + } + return string(runes) +} diff --git a/pkg/parse/annotations.go b/pkg/parse/annotations.go index a0865011f8..ceab1e6c71 100644 --- a/pkg/parse/annotations.go +++ b/pkg/parse/annotations.go @@ -37,10 +37,12 @@ func addAnnotationsAndLabels(objs []ast.FileObject, scope declared.Scope, syncNa if err != nil { return fmt.Errorf("marshaling sourceContext: %w", err) } + packageID := metadata.PackageID(syncName, scope.SyncNamespace(), scope.SyncKind()) inventoryID := applier.InventoryID(syncName, scope.SyncNamespace()) manager := declared.ResourceManager(scope, syncName) for _, obj := range objs { core.SetLabel(obj, metadata.ManagedByKey, metadata.ManagedByValue) + core.SetLabel(obj, metadata.ParentPackageIDLabel, packageID) core.SetAnnotation(obj, metadata.GitContextKey, string(gcVal)) core.SetAnnotation(obj, metadata.ResourceManagerKey, manager) core.SetAnnotation(obj, metadata.SyncTokenAnnotationKey, commitHash) diff --git a/pkg/parse/annotations_test.go b/pkg/parse/annotations_test.go index fc92985267..c21cd621b7 100644 --- a/pkg/parse/annotations_test.go +++ b/pkg/parse/annotations_test.go @@ -18,14 +18,21 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/core" + "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/importer/analyzer/ast" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/testing/fake" ) func TestAddAnnotationsAndLabels(t *testing.T) { + syncKind := configsync.RepoSyncKind + syncName := "rs" + syncNamespace := "some-namespace" + syncScope := declared.ScopeFromSyncNamespace(syncNamespace) + testcases := []struct { name string actual []ast.FileObject @@ -50,11 +57,12 @@ func TestAddAnnotationsAndLabels(t *testing.T) { expected: []ast.FileObject{fake.Role( core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, metadata.PackageID(syncName, syncNamespace, syncKind)), core.Annotation(metadata.ResourceManagementKey, "enabled"), core.Annotation(metadata.ResourceManagerKey, "some-namespace_rs"), core.Annotation(metadata.SyncTokenAnnotationKey, "1234567"), core.Annotation(metadata.GitContextKey, `{"repo":"git@github.com/foo","branch":"main","rev":"HEAD"}`), - core.Annotation(metadata.OwningInventoryKey, applier.InventoryID("rs", "some-namespace")), + core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(syncName, syncNamespace)), core.Annotation(metadata.ResourceIDKey, "rbac.authorization.k8s.io_role_foo_default-name"), )}, }, @@ -62,7 +70,7 @@ func TestAddAnnotationsAndLabels(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - if err := addAnnotationsAndLabels(tc.actual, "some-namespace", "rs", tc.gc, tc.commitHash); err != nil { + if err := addAnnotationsAndLabels(tc.actual, syncScope, syncName, tc.gc, tc.commitHash); err != nil { t.Fatalf("Failed to add annotations and labels: %v", err) } if diff := cmp.Diff(tc.expected, tc.actual, ast.CompareFileObject); diff != "" { diff --git a/pkg/parse/root_test.go b/pkg/parse/root_test.go index 95b54a51b9..c8d6fa86fa 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_test.go @@ -87,6 +87,8 @@ func TestRoot_Parse(t *testing.T) { fakeTime := fakeMetaTime.Time fakeClock := fakeclock.NewFakeClock(fakeTime) + packageID := metadata.PackageID(rootSyncName, configmanagement.ControllerNamespace, configsync.RootSyncKind) + // TODO: Test different source types and inputs fileSource := FileSource{ SourceType: configsync.GitSource, @@ -262,6 +264,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -274,6 +277,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -303,6 +307,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -321,6 +326,7 @@ func TestRoot_Parse(t *testing.T) { existingObjects: []client.Object{ fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -341,6 +347,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -353,6 +360,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -370,11 +378,12 @@ func TestRoot_Parse(t *testing.T) { namespaceStrategy: configsync.NamespaceStrategyImplicit, existingObjects: []client.Object{fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, metadata.PackageID("other-root-sync", configmanagement.ControllerNamespace, configsync.RootSyncKind)), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), core.Annotation(metadata.SyncTokenAnnotationKey, testGitCommit), - core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(rootSyncName, configmanagement.ControllerNamespace)), + core.Annotation(metadata.OwningInventoryKey, applier.InventoryID("other-root-sync", configmanagement.ControllerNamespace)), core.Annotation(metadata.ResourceIDKey, "_namespace_foo"), difftest.ManagedBy(declared.RootScope, "other-root-sync")), rootSyncInput.DeepCopy(), @@ -390,6 +399,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -420,6 +430,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -450,6 +461,7 @@ func TestRoot_Parse(t *testing.T) { fake.WithRootSyncSourceType(configsync.GitSource), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1beta1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, fmt.Sprintf("namespaces/%s/test.yaml", configsync.ControllerNamespace)), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -480,6 +492,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -491,6 +504,7 @@ func TestRoot_Parse(t *testing.T) { fake.ConfigMap(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/configmap.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -503,6 +517,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -523,6 +538,7 @@ func TestRoot_Parse(t *testing.T) { // bar not exists, should be added as an implicit namespace fake.NamespaceObject("baz", // baz exists and self-managed, should be added as an implicit namespace core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -547,6 +563,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -558,6 +575,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -569,6 +587,7 @@ func TestRoot_Parse(t *testing.T) { fake.ConfigMap(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/configmap.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -580,6 +599,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("baz"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -591,6 +611,7 @@ func TestRoot_Parse(t *testing.T) { fake.ConfigMap(core.Namespace("baz"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/configmap.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -603,6 +624,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -615,6 +637,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("baz"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -729,6 +752,8 @@ func TestRoot_Parse(t *testing.T) { } func TestRoot_DeclaredFields(t *testing.T) { + packageID := metadata.PackageID(rootSyncName, configmanagement.ControllerNamespace, configsync.RootSyncKind) + testCases := []struct { name string webhookEnabled bool @@ -741,6 +766,7 @@ func TestRoot_DeclaredFields(t *testing.T) { webhookEnabled: false, existingObjects: []client.Object{fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{},"f:labels":{}},"f:rules":{}}`), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -755,6 +781,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -770,6 +797,7 @@ func TestRoot_DeclaredFields(t *testing.T) { webhookEnabled: true, existingObjects: []client.Object{fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{},"f:labels":{}},"f:rules":{}}`), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -785,6 +813,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{"f:configmanagement.gke.io/source-path":{}},"f:labels":{"f:configsync.gke.io/declared-version":{}}},"f:rules":{}}`), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), @@ -802,6 +831,7 @@ func TestRoot_DeclaredFields(t *testing.T) { existingObjects: []client.Object{ fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), core.Annotation(metadata.SyncTokenAnnotationKey, ""), @@ -817,6 +847,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{"f:configmanagement.gke.io/source-path":{}},"f:labels":{"f:configsync.gke.io/declared-version":{}}},"f:rules":{}}`), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), @@ -834,6 +865,7 @@ func TestRoot_DeclaredFields(t *testing.T) { existingObjects: []client.Object{ fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), core.Annotation(metadata.SyncTokenAnnotationKey, ""), @@ -848,6 +880,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -982,6 +1015,8 @@ func fakeParseError(err error, gvks ...schema.GroupVersionKind) status.MultiErro } func TestRoot_Parse_Discovery(t *testing.T) { + packageID := metadata.PackageID(rootSyncName, configmanagement.ControllerNamespace, configsync.RootSyncKind) + testCases := []struct { name string parsed []ast.FileObject @@ -1038,6 +1073,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1050,6 +1086,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1074,6 +1111,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { fakeCRD(core.Name("anvils.acme.com"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "cluster/crd.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1084,6 +1122,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1097,6 +1136,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(metadata.ResourceManagerKey, ":root_my-rs"), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/obj.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), @@ -1109,6 +1149,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(metadata.ParentPackageIDLabel, packageID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 6e6d55c0b5..1c6084219e 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -32,6 +32,7 @@ import ( "kpt.dev/configsync/pkg/importer/filesystem" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/importer/reader" + "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/parse" "kpt.dev/configsync/pkg/reconciler/finalizer" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" @@ -195,7 +196,8 @@ func Run(opts Options) { if reconcileTimeout < 0 { klog.Fatalf("Invalid reconcileTimeout: %v, timeout should not be negative", reconcileTimeout) } - clientSet, err := applier.NewClientSet(cl, configFlags, opts.StatusMode) + packageID := metadata.PackageID(opts.SyncName, opts.ReconcilerScope.SyncNamespace(), opts.ReconcilerScope.SyncKind()) + clientSet, err := applier.NewClientSet(cl, configFlags, opts.StatusMode, packageID) if err != nil { klog.Fatalf("Error creating clients: %v", err) } diff --git a/pkg/reconcilermanager/controllers/reconciler_base.go b/pkg/reconcilermanager/controllers/reconciler_base.go index b117bf4d54..16e5a88e10 100644 --- a/pkg/reconcilermanager/controllers/reconciler_base.go +++ b/pkg/reconcilermanager/controllers/reconciler_base.go @@ -711,3 +711,28 @@ func (r *reconcilerBase) isWebhookEnabled(ctx context.Context) (bool, error) { } return err == nil, err } + +func (r *reconcilerBase) patchSyncMetadata(ctx context.Context, rs client.Object) (bool, error) { + packageID := metadata.PackageID(rs.GetName(), rs.GetNamespace(), r.syncKind) + currentPackageID := core.GetLabel(rs, metadata.PackageIDLabel) + if currentPackageID == packageID { + r.logger(ctx).V(5).Info("Sync metadata update skipped: no change") + return false, nil + } + + if r.logger(ctx).V(5).Enabled() { + r.logger(ctx).Info("Updating sync metadata: package ID label", + logFieldResourceVersion, rs.GetResourceVersion(), + "labelKey", metadata.PackageIDLabel, + "labelValueOld", currentPackageID, "labelValue", packageID) + } + + patch := fmt.Sprintf(`{"metadata": {"labels": {%q: %q}}}`, metadata.PackageIDLabel, packageID) + err := r.client.Patch(ctx, rs, client.RawPatch(types.MergePatchType, []byte(patch)), + client.FieldOwner(reconcilermanager.FieldManager)) + if err != nil { + return true, fmt.Errorf("Sync metadata update failed: %w", err) + } + r.logger(ctx).Info("Sync metadata update successful") + return true, nil +} diff --git a/pkg/reconcilermanager/controllers/reposync_controller.go b/pkg/reconcilermanager/controllers/reposync_controller.go index 03151565f4..bf2f7da405 100644 --- a/pkg/reconcilermanager/controllers/reposync_controller.go +++ b/pkg/reconcilermanager/controllers/reposync_controller.go @@ -320,11 +320,15 @@ func (r *RepoSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile } // setup performs the following steps: +// - Patch RepoSync to upsert package-id label // - Create or update managed objects // - Convert any error into RepoSync status conditions // - Update the RepoSync status func (r *RepoSyncReconciler) setup(ctx context.Context, reconcilerRef types.NamespacedName, rs *v1beta1.RepoSync) error { - err := r.upsertManagedObjects(ctx, reconcilerRef, rs) + _, err := r.patchSyncMetadata(ctx, rs) + if err == nil { + err = r.upsertManagedObjects(ctx, reconcilerRef, rs) + } updated, updateErr := r.updateSyncStatus(ctx, rs, reconcilerRef, func(syncObj *v1beta1.RepoSync) error { // Modify the sync status, // but keep the upsert error separate from the status update error. diff --git a/pkg/reconcilermanager/controllers/rootsync_controller.go b/pkg/reconcilermanager/controllers/rootsync_controller.go index 769a0c40b0..10d3a7b700 100644 --- a/pkg/reconcilermanager/controllers/rootsync_controller.go +++ b/pkg/reconcilermanager/controllers/rootsync_controller.go @@ -269,11 +269,15 @@ func (r *RootSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile } // setup performs the following steps: +// - Patch RootSync to upsert package-id label // - Create or update managed objects // - Convert any error into RootSync status conditions // - Update the RootSync status func (r *RootSyncReconciler) setup(ctx context.Context, reconcilerRef types.NamespacedName, rs *v1beta1.RootSync) error { - err := r.upsertManagedObjects(ctx, reconcilerRef, rs) + _, err := r.patchSyncMetadata(ctx, rs) + if err == nil { + err = r.upsertManagedObjects(ctx, reconcilerRef, rs) + } updated, updateErr := r.updateSyncStatus(ctx, rs, reconcilerRef, func(syncObj *v1beta1.RootSync) error { // Modify the sync status, // but keep the upsert error separate from the status update error. diff --git a/pkg/remediator/watch/filteredwatcher_test.go b/pkg/remediator/watch/filteredwatcher_test.go index 4a36927237..f821ed87cd 100644 --- a/pkg/remediator/watch/filteredwatcher_test.go +++ b/pkg/remediator/watch/filteredwatcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/ptr" @@ -346,6 +347,7 @@ func TestFilteredWatcher(t *testing.T) { return <-watches, nil }, conflictHandler: testfake.NewConflictHandler(), + labelSelector: labels.Everything(), } w := NewFiltered(cfg) diff --git a/pkg/remediator/watch/manager.go b/pkg/remediator/watch/manager.go index 6c126672f0..e976464350 100644 --- a/pkg/remediator/watch/manager.go +++ b/pkg/remediator/watch/manager.go @@ -19,10 +19,12 @@ import ( "errors" "sync" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/declared" + "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/queue" "kpt.dev/configsync/pkg/status" @@ -49,6 +51,9 @@ type Manager struct { // watcherFactory is the function to create a watcher. watcherFactory watcherFactory + // labelSelector filters watches + labelSelector labels.Selector + // The following fields are guarded by the mutex. mux sync.RWMutex // watching is true if UpdateWatches has been called @@ -89,6 +94,12 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, } } + // Only watch & remediate objects applied by this reconciler + packageID := metadata.PackageID(syncName, scope.SyncNamespace(), scope.SyncKind()) + labelSelector := labels.Set{ + metadata.ParentPackageIDLabel: packageID, + }.AsSelector() + return &Manager{ scope: scope, syncName: syncName, @@ -96,6 +107,7 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, resources: decls, watcherMap: make(map[schema.GroupVersionKind]Runnable), watcherFactory: options.watcherFactory, + labelSelector: labelSelector, queue: q, conflictHandler: ch, }, nil @@ -206,6 +218,7 @@ func (m *Manager) startWatcher(ctx context.Context, gvk schema.GroupVersionKind) scope: m.scope, syncName: m.syncName, conflictHandler: m.conflictHandler, + labelSelector: m.labelSelector, } w, err := m.watcherFactory(cfg) if err != nil { diff --git a/pkg/remediator/watch/manager_test.go b/pkg/remediator/watch/manager_test.go index b3a5366311..4c9741cdc0 100644 --- a/pkg/remediator/watch/manager_test.go +++ b/pkg/remediator/watch/manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "kpt.dev/configsync/pkg/declared" @@ -38,6 +39,7 @@ func fakeRunnable() Runnable { return watch.NewFake(), nil }, conflictHandler: fake.NewConflictHandler(), + labelSelector: labels.Everything(), } return NewFiltered(cfg) } diff --git a/pkg/remediator/watch/watcher.go b/pkg/remediator/watch/watcher.go index 310c7d8fb6..754f5572a4 100644 --- a/pkg/remediator/watch/watcher.go +++ b/pkg/remediator/watch/watcher.go @@ -18,6 +18,7 @@ import ( "context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" @@ -38,6 +39,7 @@ type watcherConfig struct { syncName string startWatch WatchFunc conflictHandler conflict.Handler + labelSelector labels.Selector } // watcherFactory knows how to build watch.Runnables. @@ -56,6 +58,9 @@ func watcherFactoryFromListerWatcherFactory(factory ListerWatcherFactory) watche namespace = string(cfg.scope) } lw := factoryPtr(cfg.gvk, namespace) + if cfg.labelSelector != nil { + options.LabelSelector = cfg.labelSelector.String() + } return ListAndWatch(ctx, lw, options) } } diff --git a/pkg/util/env.go b/pkg/util/env.go index c9593f5dfd..f39657f656 100644 --- a/pkg/util/env.go +++ b/pkg/util/env.go @@ -16,6 +16,7 @@ package util import ( "fmt" + "io" "os" "path/filepath" "strconv" @@ -59,7 +60,7 @@ func EnvInt(key string, def int) int { if env := os.Getenv(key); env != "" { val, err := strconv.ParseInt(env, 0, 0) if err != nil { - fmt.Fprintf(os.Stderr, "WARNING: invalid env value (%v): using default, key=%s, val=%q, default=%d\n", err, key, env, def) + MustFprintf(os.Stderr, "WARNING: invalid env value (%v): using default, key=%s, val=%q, default=%d\n", err, key, env, def) return def } return int(val) @@ -73,7 +74,7 @@ func EnvFloat(key string, def float64) float64 { if env := os.Getenv(key); env != "" { val, err := strconv.ParseFloat(env, 64) if err != nil { - fmt.Fprintf(os.Stderr, "WARNING: invalid env value (%v): using default, key=%s, val=%q, default=%f\n", err, key, env, def) + MustFprintf(os.Stderr, "WARNING: invalid env value (%v): using default, key=%s, val=%q, default=%f\n", err, key, env, def) return def } return val @@ -113,3 +114,11 @@ func UpdateSymlink(helmRoot, linkAbsPath, packageDir, oldPackageDir string) erro klog.Infof("symlink %q updates to %q", linkAbsPath, packageDir) return nil } + +// MustFprintf prints to the writer and panics if it errors. +func MustFprintf(writer io.Writer, format string, a ...any) { + _, err := fmt.Fprintf(writer, format, a...) + if err != nil { + panic(fmt.Sprintf("Error writing output: %v", err)) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 7933eee5d2..f1a5f7276c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1316,8 +1316,8 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# sigs.k8s.io/cli-utils v0.35.1-0.20240504222723-227a03f4a7f9 -## explicit; go 1.20 +# sigs.k8s.io/cli-utils v0.37.2 +## explicit; go 1.22.0 sigs.k8s.io/cli-utils/pkg/apis/actuation sigs.k8s.io/cli-utils/pkg/apply sigs.k8s.io/cli-utils/pkg/apply/cache diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/apply/applier_builder.go b/vendor/sigs.k8s.io/cli-utils/pkg/apply/applier_builder.go index 09f3523902..e964d110f8 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/apply/applier_builder.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/apply/applier_builder.go @@ -86,3 +86,8 @@ func (b *ApplierBuilder) WithStatusWatcher(statusWatcher watcher.StatusWatcher) b.statusWatcher = statusWatcher return b } + +func (b *ApplierBuilder) WithStatusWatcherFilters(filters *watcher.Filters) *ApplierBuilder { + b.statusWatcherFilters = filters + return b +} diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/apply/builder.go b/vendor/sigs.k8s.io/cli-utils/pkg/apply/builder.go index e252902c9d..75602e74f0 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/apply/builder.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/apply/builder.go @@ -27,6 +27,7 @@ type commonBuilder struct { restConfig *rest.Config unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error) statusWatcher watcher.StatusWatcher + statusWatcherFilters *watcher.Filters } func (cb *commonBuilder) finalize() (*commonBuilder, error) { @@ -78,7 +79,15 @@ func (cb *commonBuilder) finalize() (*commonBuilder, error) { cx.unstructuredClientForMapping = cx.factory.UnstructuredClientForMapping } if cx.statusWatcher == nil { - cx.statusWatcher = watcher.NewDefaultStatusWatcher(cx.client, cx.mapper) + statusWatcher := watcher.NewDefaultStatusWatcher(cx.client, cx.mapper) + if cx.statusWatcherFilters != nil { + statusWatcher.Filters = cx.statusWatcherFilters + } + cx.statusWatcher = statusWatcher + } else if cx.statusWatcherFilters != nil { + // If you want to use a custom status watcher with a label selector, + // configure it before injecting the status watcher. + return nil, errors.New("status watcher and status watcher filters must not both be provided") } return &cx, nil } diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/apply/destroyer_builder.go b/vendor/sigs.k8s.io/cli-utils/pkg/apply/destroyer_builder.go index 870859fa1f..6927284dc4 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/apply/destroyer_builder.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/apply/destroyer_builder.go @@ -86,3 +86,8 @@ func (b *DestroyerBuilder) WithStatusWatcher(statusWatcher watcher.StatusWatcher b.statusWatcher = statusWatcher return b } + +func (b *DestroyerBuilder) WithStatusWatcherFilters(filters *watcher.Filters) *DestroyerBuilder { + b.statusWatcherFilters = filters + return b +} diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/apply/mutator/apply_time_mutator.go b/vendor/sigs.k8s.io/cli-utils/pkg/apply/mutator/apply_time_mutator.go index d197b3f414..2481517627 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/apply/mutator/apply_time_mutator.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/apply/mutator/apply_time_mutator.go @@ -229,7 +229,7 @@ func computeStatus(obj *unstructured.Unstructured) cache.ResourceStatus { if err != nil { if klog.V(3).Enabled() { ref := mutation.ResourceReferenceFromUnstructured(obj) - klog.Info("failed to compute object status (%s): %d", ref, err) + klog.Infof("failed to compute object status (%s): %d", ref, err) } return cache.ResourceStatus{ Resource: obj, diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/apply/task/apply_task.go b/vendor/sigs.k8s.io/cli-utils/pkg/apply/task/apply_task.go index e379153acf..48280b223a 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/apply/task/apply_task.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/apply/task/apply_task.go @@ -15,13 +15,13 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" "k8s.io/kubectl/pkg/cmd/apply" cmddelete "k8s.io/kubectl/pkg/cmd/delete" - applyerror "sigs.k8s.io/cli-utils/pkg/apply/error" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/filter" @@ -193,7 +193,7 @@ func newApplyOptions(taskName string, eventChannel chan<- event.Event, serverSid Overwrite: true, // Normally set in apply.NewApplyOptions OpenAPIPatch: true, // Normally set in apply.NewApplyOptions Recorder: genericclioptions.NoopRecorder{}, - IOStreams: genericclioptions.IOStreams{ + IOStreams: genericiooptions.IOStreams{ Out: io.Discard, ErrOut: io.Discard, // TODO: Warning for no lastConfigurationAnnotation // is printed directly to stderr in ApplyOptions. We diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/jsonpath/jsonpath.go b/vendor/sigs.k8s.io/cli-utils/pkg/jsonpath/jsonpath.go index e1ae5ac859..8dc77cd28c 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/jsonpath/jsonpath.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/jsonpath/jsonpath.go @@ -7,14 +7,13 @@ import ( "encoding/json" "fmt" + "github.com/spyzhov/ajson" // Using gopkg.in/yaml.v3 instead of sigs.k8s.io/yaml on purpose. // yaml.v3 correctly parses ints: // https://github.com/kubernetes-sigs/yaml/issues/45 // yaml.v3 Node is also used as input to yqlib. "gopkg.in/yaml.v3" "k8s.io/klog/v2" - - "github.com/spyzhov/ajson" ) // Get evaluates the JSONPath expression to extract values from the input map. @@ -28,7 +27,7 @@ func Get(obj map[string]interface{}, expression string) ([]interface{}, error) { return nil, fmt.Errorf("failed to marshal input to json: %w", err) } - klog.V(7).Info("jsonpath.Get input as json:\n%s", jsonBytes) + klog.V(7).Infof("jsonpath.Get input as json:\n%s", jsonBytes) // parse json into an ajson node root, err := ajson.Unmarshal(jsonBytes) @@ -52,7 +51,7 @@ func Get(obj map[string]interface{}, expression string) ([]interface{}, error) { return nil, fmt.Errorf("failed to marshal jsonpath result to json: %w", err) } - klog.V(7).Info("jsonpath.Get output as json:\n%s", jsonBytes) + klog.V(7).Infof("jsonpath.Get output as json:\n%s", jsonBytes) // parse json back into a Go primitive var value interface{} @@ -77,7 +76,7 @@ func Set(obj map[string]interface{}, expression string, value interface{}) (int, return 0, fmt.Errorf("failed to marshal input to json: %w", err) } - klog.V(7).Info("jsonpath.Set input as json:\n%s", jsonBytes) + klog.V(7).Infof("jsonpath.Set input as json:\n%s", jsonBytes) // parse json into an ajson node root, err := ajson.Unmarshal(jsonBytes) @@ -138,7 +137,7 @@ func Set(obj map[string]interface{}, expression string, value interface{}) (int, return 0, fmt.Errorf("failed to marshal jsonpath result to json: %w", err) } - klog.V(7).Info("jsonpath.Set output as json:\n%s", jsonBytes) + klog.V(7).Infof("jsonpath.Set output as json:\n%s", jsonBytes) // parse json back into the input map err = yaml.Unmarshal(jsonBytes, &obj) diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/default_status_watcher.go b/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/default_status_watcher.go index 08d3ea97a7..e732fbe4db 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/default_status_watcher.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/default_status_watcher.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" @@ -43,6 +44,15 @@ type DefaultStatusWatcher struct { // required for computing parent object status, to compensate for // controllers that aren't following status conventions. ClusterReader engine.ClusterReader + + // Indexers control how the watch cache is indexed, allowing namespace + // filtering and field selectors. If you watch at namespace scope, you must + // provide the namespace indexer. If you specify a field selector filter, + // you must also provide an indexer for that field. + Indexers cache.Indexers + + // Filters allows filtering the objects being watched. + Filters *Filters } var _ StatusWatcher = &DefaultStatusWatcher{} @@ -60,6 +70,7 @@ func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMa DynamicClient: dynamicClient, Mapper: mapper, }, + Indexers: DefaultIndexers(), } } @@ -88,13 +99,18 @@ func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadata } informer := &ObjectStatusReporter{ - InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod), - Mapper: w.Mapper, - StatusReader: w.StatusReader, - ClusterReader: w.ClusterReader, - Targets: targets, - ObjectFilter: &AllowListObjectFilter{AllowList: ids}, - RESTScope: scope, + InformerFactory: &DynamicInformerFactory{ + Client: w.DynamicClient, + ResyncPeriod: w.ResyncPeriod, + Indexers: w.Indexers, + Filters: w.Filters, + }, + Mapper: w.Mapper, + StatusReader: w.StatusReader, + ClusterReader: w.ClusterReader, + Targets: targets, + ObjectFilter: &AllowListObjectFilter{AllowList: ids}, + RESTScope: scope, } return informer.Start(ctx) } diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/dynamic_informer_factory.go b/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/dynamic_informer_factory.go index 27810d6050..1e0c95390b 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/dynamic_informer_factory.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/dynamic_informer_factory.go @@ -5,12 +5,16 @@ package watcher import ( "context" + "fmt" "time" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" @@ -20,15 +24,14 @@ type DynamicInformerFactory struct { Client dynamic.Interface ResyncPeriod time.Duration Indexers cache.Indexers + Filters *Filters } func NewDynamicInformerFactory(client dynamic.Interface, resyncPeriod time.Duration) *DynamicInformerFactory { return &DynamicInformerFactory{ Client: client, ResyncPeriod: resyncPeriod, - Indexers: cache.Indexers{ - cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - }, + Indexers: DefaultIndexers(), } } @@ -36,22 +39,122 @@ func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta. // Unstructured example output need `"apiVersion"` and `"kind"` set. example := &unstructured.Unstructured{} example.SetGroupVersionKind(mapping.GroupVersionKind) - return cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return f.Client.Resource(mapping.Resource). - Namespace(namespace). - List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return f.Client.Resource(mapping.Resource). - Namespace(namespace). - Watch(ctx, options) - }, - }, + NewFilteredListWatchFromDynamicClient( + ctx, + f.Client, + mapping.Resource, + namespace, + f.Filters, + ), example, f.ResyncPeriod, f.Indexers, ) } + +// DefaultIndexers returns the default set of cache indexers, namely the +// namespace indexer. +func DefaultIndexers() cache.Indexers { + return cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + } +} + +// Filters are optional selectors for list and watch +type Filters struct { + Labels labels.Selector + Fields fields.Selector +} + +// NewFilteredListWatchFromDynamicClient creates a new ListWatch from the +// specified client, resource, namespace, and optional filters. +func NewFilteredListWatchFromDynamicClient( + ctx context.Context, + client dynamic.Interface, + resource schema.GroupVersionResource, + namespace string, + filters *Filters, +) *cache.ListWatch { + optionsModifier := func(options *metav1.ListOptions) error { + if filters == nil { + return nil + } + if filters.Labels != nil { + selector := filters.Labels + // Merge label selectors, if both were provided + if options.LabelSelector != "" { + var err error + selector, err = labels.Parse(options.LabelSelector) + if err != nil { + return fmt.Errorf("parsing label selector: %w", err) + } + selector = andLabelSelectors(selector, filters.Labels) + } + options.LabelSelector = selector.String() + } + if filters.Fields != nil { + selector := filters.Fields + // Merge field selectors, if both were provided + if options.FieldSelector != "" { + var err error + selector, err = fields.ParseSelector(options.FieldSelector) + if err != nil { + return fmt.Errorf("parsing field selector: %w", err) + } + selector = fields.AndSelectors(selector, filters.Fields) + } + options.FieldSelector = selector.String() + } + return nil + } + return NewModifiedListWatchFromDynamicClient(ctx, client, resource, namespace, optionsModifier) +} + +// NewModifiedListWatchFromDynamicClient creates a new ListWatch from the +// specified client, resource, namespace, and options modifier. +// Options modifier is a function takes a ListOptions and modifies the consumed +// ListOptions. Provide customized modifier function to apply modification to +// ListOptions with field selectors, label selectors, or any other desired options. +func NewModifiedListWatchFromDynamicClient( + ctx context.Context, + client dynamic.Interface, + resource schema.GroupVersionResource, + namespace string, + optionsModifier func(*metav1.ListOptions) error, +) *cache.ListWatch { + listFunc := func(options metav1.ListOptions) (runtime.Object, error) { + if err := optionsModifier(&options); err != nil { + return nil, fmt.Errorf("modifying list options: %w", err) + } + return client.Resource(resource). + Namespace(namespace). + List(ctx, options) + } + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + options.Watch = true + if err := optionsModifier(&options); err != nil { + return nil, fmt.Errorf("modifying watch options: %w", err) + } + return client.Resource(resource). + Namespace(namespace). + Watch(ctx, options) + } + return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} +} + +func andLabelSelectors(selectors ...labels.Selector) labels.Selector { + var s labels.Selector + for _, item := range selectors { + if s == nil { + s = item + } else { + reqs, selectable := item.Requirements() + if !selectable { + return item // probably the nothing selector + } + s = s.Add(reqs...) + } + } + return s +} diff --git a/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/object_status_reporter.go b/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/object_status_reporter.go index a9b0e90ef9..5ce33b4c33 100644 --- a/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/object_status_reporter.go +++ b/vendor/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/object_status_reporter.go @@ -693,7 +693,7 @@ func (w *ObjectStatusReporter) newStatusCheckTaskFunc( id object.ObjMetadata, ) taskFunc { return func() { - klog.V(5).Infof("Re-reading object status: %s", status.ScheduleWindow, id) + klog.V(5).Infof("Re-reading object status: %v", id) // check again rs, err := w.readStatusFromCluster(ctx, id) if err != nil {