From 857790e024c1a559400f726a5b0c022e33e8c300 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Tue, 9 Jul 2024 17:31:52 -0700 Subject: [PATCH] Add watch filtering by applyset label - Add "applyset.kubernetes.io/part-of" label to managed objects - Add "applyset.kubernetes.io/id" label to RootSyncs/RepoSyncs - Generate ApplySet IDs using kubectl code for compatibility. Note: kubectl impl is slightly different than the KEP, in that it uses "applyset.kubernetes.io" instead of "applyset.k8s.io" --- e2e/testcases/custom_resources_test.go | 14 +- e2e/testcases/managed_resources_test.go | 299 ++++++++++-------- e2e/testcases/multi_sync_test.go | 18 +- e2e/testcases/preserve_fields_test.go | 27 +- pkg/api/configsync/register.go | 12 +- pkg/applier/applier_test.go | 35 +- pkg/applier/clientset.go | 16 +- pkg/core/decorate.go | 9 + pkg/declared/applyset.go | 50 +++ pkg/kinds/resources.go | 19 ++ pkg/metadata/metadata_test.go | 7 + pkg/parse/annotations.go | 3 + pkg/parse/annotations_test.go | 12 +- pkg/parse/root_test.go | 56 +++- pkg/reconciler/reconciler.go | 3 +- .../controllers/reconciler_base.go | 29 ++ .../controllers/reposync_controller.go | 6 +- .../controllers/rootsync_controller.go | 6 +- pkg/remediator/watch/filteredwatcher_test.go | 2 + pkg/remediator/watch/manager.go | 13 + pkg/remediator/watch/manager_test.go | 2 + pkg/remediator/watch/watcher.go | 5 + 22 files changed, 450 insertions(+), 193 deletions(-) create mode 100644 pkg/declared/applyset.go diff --git a/e2e/testcases/custom_resources_test.go b/e2e/testcases/custom_resources_test.go index 66444db2b9..02c9f5fee4 100644 --- a/e2e/testcases/custom_resources_test.go +++ b/e2e/testcases/custom_resources_test.go @@ -109,16 +109,12 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) { nt.T.Fatal(err) } - // Wait for remediator to detect the drift (managed Anvil CR was deleted) - // and record it as a conflict. - // TODO: distinguish between management conflict (spec/generation drift) and concurrent status update conflict (resource version change) - err = nomostest.ValidateMetrics(nt, + // Reverting a manual change is NOT considered a "conflict", unless the new + // owner is a different reconciler. + nt.Must(nomostest.ValidateMetrics(nt, nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, firstCommitHash, metrics.ErrorSummary{ - Conflicts: 1, - })) - if err != nil { - nt.T.Fatal(err) - } + Conflicts: 0, // from the remediator + }))) // Reset discovery client to invalidate the cached Anvil CRD nt.RenewClient() diff --git a/e2e/testcases/managed_resources_test.go b/e2e/testcases/managed_resources_test.go index f8a18d9a83..7ea52dfd4b 100644 --- a/e2e/testcases/managed_resources_test.go +++ b/e2e/testcases/managed_resources_test.go @@ -22,10 +22,15 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" nomostesting "kpt.dev/configsync/e2e/nomostest/testing" + "kpt.dev/configsync/e2e/nomostest/testkubeclient" "kpt.dev/configsync/e2e/nomostest/testpredicates" "kpt.dev/configsync/e2e/nomostest/testwatcher" "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/e2e/nomostest" "kpt.dev/configsync/e2e/nomostest/ntopts" @@ -37,19 +42,21 @@ import ( // This file includes tests for drift correction and drift prevention. // -// The drift correction in the mono-repo mode utilizes the following two annotations: -// * configmanagement.gke.io/managed -// * configsync.gke.io/resource-id +// The drift correction in the mono-repo mode utilizes the following metadata: +// * the configmanagement.gke.io/managed annotation +// * the configsync.gke.io/resource-id annotation // -// The drift correction in the multi-repo mode utilizes the following three annotations: -// * configmanagement.gke.io/managed -// * configsync.gke.io/resource-id -// * configsync.gke.io/manager +// The drift correction in the multi-repo mode utilizes the following metadata: +// * the configmanagement.gke.io/managed annotation +// * the configsync.gke.io/resource-id annotation +// * the configsync.gke.io/manager annotation +// * the config.k8s.io/owning-inventory label // // The drift prevention is only supported in the multi-repo mode, and utilizes the following Config Sync metadata: // * the configmanagement.gke.io/managed annotation // * the configsync.gke.io/resource-id annotation // * the configsync.gke.io/declared-version label +// * the config.k8s.io/owning-inventory label // The reason we have both TestKubectlCreatesManagedNamespaceResourceMonoRepo and // TestKubectlCreatesManagedNamespaceResourceMultiRepo is that the mono-repo mode and @@ -70,19 +77,20 @@ func TestKubectlCreatesManagedNamespaceResourceMultiRepo(t *testing.T) { } /* A new test */ - ns := []byte(` -apiVersion: v1 -kind: Namespace -metadata: - name: test-ns1 - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _namespace_test-ns1 - configsync.gke.io/manager: :root -`) + ns1Obj := &unstructured.Unstructured{} + ns1Obj.SetGroupVersionKind(kinds.Namespace()) + ns1Obj.SetName("test-ns1") + ns1Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_namespace_test-ns1", + metadata.ResourceManagerKey: string(declared.RootScope), + }) + ns1Obj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "root-sync.config-management-system.RootSync", + }) - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns1.yaml"), ns, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-ns1.yaml"), ns1Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err := nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-ns1.yaml")) @@ -98,19 +106,20 @@ metadata: } /* A new test */ - ns = []byte(` -apiVersion: v1 -kind: Namespace -metadata: - name: test-ns2 - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _namespace_test-ns2 - configsync.gke.io/manager: :abcdef -`) + ns2Obj := &unstructured.Unstructured{} + ns2Obj.SetGroupVersionKind(kinds.Namespace()) + ns2Obj.SetName("test-ns2") + ns2Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_namespace_test-ns2", + metadata.ResourceManagerKey: "config-management-system_abcdef", + }) + ns2Obj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "abcdef.config-management-system.RootSync", + }) - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns2.yaml"), ns, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-ns2.yaml"), ns2Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-ns2.yaml")) @@ -121,8 +130,10 @@ metadata: // Wait 5 seconds so that the remediator can process the event. time.Sleep(5 * time.Second) - // The `configsync.gke.io/manager` annotation of `test-ns2` suggests that its manager is ':abcdef'. - // The root reconciler does not manage `test-ns2`, therefore should not remove `test-ns2`. + // The `configsync.gke.io/manager` annotation of `test-ns2` suggests that + // its manager is 'config-management-system_abcdef' (RootSync named 'abcdef'). + // The root reconciler (RootSync named 'root-sync') does not manage + // `test-ns2`, therefore should not remove `test-ns2`. err = nt.Validate("test-ns2", "", &corev1.Namespace{}, testpredicates.HasExactlyAnnotationKeys( metadata.ResourceManagementKey, @@ -134,18 +145,16 @@ metadata: } /* A new test */ - ns = []byte(` -apiVersion: v1 -kind: Namespace -metadata: - name: test-ns3 - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _namespace_test-ns3 -`) + ns3Obj := &unstructured.Unstructured{} + ns3Obj.SetGroupVersionKind(kinds.Namespace()) + ns3Obj.SetName("test-ns3") + ns3Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_namespace_test-ns3", + }) - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns3.yaml"), ns, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-ns3.yaml"), ns3Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-ns3.yaml")) @@ -168,19 +177,17 @@ metadata: } /* A new test */ - ns = []byte(` -apiVersion: v1 -kind: Namespace -metadata: - name: test-ns4 - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _namespace_wrong-ns4 - configsync.gke.io/manager: :root -`) + ns4Obj := &unstructured.Unstructured{} + ns4Obj.SetGroupVersionKind(kinds.Namespace()) + ns4Obj.SetName("test-ns4") + ns4Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_namespace_wrong-ns4", + metadata.ResourceManagerKey: string(declared.RootScope), + }) - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-ns4.yaml"), ns, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-ns4.yaml"), ns4Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-ns4.yaml")) @@ -204,6 +211,17 @@ metadata: } } +func writeObjectYAMLFile(path string, obj *unstructured.Unstructured, scheme *runtime.Scheme) error { + nsBytes, err := testkubeclient.SerializeObject(obj, ".yaml", scheme) + if err != nil { + return fmt.Errorf("failed to format yaml: %w", err) + } + if err := os.WriteFile(path, nsBytes, 0644); err != nil { + return fmt.Errorf("failed to create a tmp file: %w", err) + } + return nil +} + // TestKubectlCreatesManagedConfigMapResource tests the drift correction regarding kubectl // tries to create a managed non-namespace resource in both the mono-repo mode and the multi-repo mode. func TestKubectlCreatesManagedConfigMapResource(t *testing.T) { @@ -223,22 +241,24 @@ func TestKubectlCreatesManagedConfigMapResource(t *testing.T) { } /* A new test */ - cm := []byte(` -apiVersion: v1 -kind: ConfigMap -metadata: - name: test-cm1 - namespace: bookstore - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _configmap_bookstore_test-cm1 - configsync.gke.io/manager: :root -data: - weekday: "monday" -`) - - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-cm1.yaml"), cm, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + cm1Obj := &unstructured.Unstructured{} + cm1Obj.SetGroupVersionKind(kinds.ConfigMap()) + cm1Obj.SetName("test-cm1") + cm1Obj.SetNamespace("bookstore") + cm1Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_configmap_bookstore_test-cm1", + metadata.ResourceManagerKey: string(declared.RootScope), + }) + cm1Obj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "root-sync.config-management-system.RootSync", + }) + cm1Obj.Object["data"] = map[string]string{ + "weekday": "monday", + } + + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-cm1.yaml"), cm1Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err := nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-cm1.yaml")) @@ -253,22 +273,24 @@ data: } /* A new test */ - cm = []byte(` -apiVersion: v1 -kind: ConfigMap -metadata: - name: test-cm2 - namespace: bookstore - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _configmap_bookstore_wrong-cm2 - configsync.gke.io/manager: :root -data: - weekday: "monday" -`) - - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-cm2.yaml"), cm, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + cm2Obj := &unstructured.Unstructured{} + cm2Obj.SetGroupVersionKind(kinds.ConfigMap()) + cm2Obj.SetName("test-cm2") + cm2Obj.SetNamespace("bookstore") + cm2Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_configmap_bookstore_wrong-cm2", + metadata.ResourceManagerKey: string(declared.RootScope), + }) + cm2Obj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "root-sync.config-management-system.RootSync", + }) + cm2Obj.Object["data"] = map[string]string{ + "weekday": "monday", + } + + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-cm2.yaml"), cm2Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-cm2.yaml")) @@ -292,22 +314,24 @@ data: } /* A new test */ - cm = []byte(` -apiVersion: v1 -kind: ConfigMap -metadata: - name: test-cm3 - namespace: bookstore - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _configmap_bookstore_test-cm3 - configsync.gke.io/manager: :abcdef -data: - weekday: "monday" -`) - - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-cm3.yaml"), cm, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + cm3Obj := &unstructured.Unstructured{} + cm3Obj.SetGroupVersionKind(kinds.ConfigMap()) + cm3Obj.SetName("test-cm3") + cm3Obj.SetNamespace("bookstore") + cm3Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_configmap_bookstore_test-cm3", + metadata.ResourceManagerKey: "config-management-system_abcdef", + }) + cm3Obj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "abcdef.config-management-system.RootSync", + }) + cm3Obj.Object["data"] = map[string]string{ + "weekday": "monday", + } + + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-cm3.yaml"), cm3Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-cm3.yaml")) @@ -318,8 +342,10 @@ data: // Wait 5 seconds so that the reconciler can process the event. time.Sleep(5 * time.Second) - // The `configsync.gke.io/manager` annotation of `test-ns3` suggests that its manager is ':abcdef'. - // The root reconciler does not manage `test-ns3`, therefore should not remove `test-ns3`. + // The `configsync.gke.io/manager` annotation of `test-ns3` suggests that + // its manager is 'config-management-system_abcdef' (RootSync named 'abcdef'). + // The root reconciler (RootSync named 'root-sync') does not manage `test-ns3`, + // therefore should not remove `test-ns3`. err = nt.Validate("test-cm3", "bookstore", &corev1.ConfigMap{}, testpredicates.HasExactlyAnnotationKeys( metadata.ResourceManagementKey, @@ -331,21 +357,23 @@ data: } /* A new test */ - cm = []byte(` -apiVersion: v1 -kind: ConfigMap -metadata: - name: test-cm4 - namespace: bookstore - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _configmap_bookstore_test-cm4 -data: - weekday: "monday" -`) - - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-cm4.yaml"), cm, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + cm4Obj := &unstructured.Unstructured{} + cm4Obj.SetGroupVersionKind(kinds.ConfigMap()) + cm4Obj.SetName("test-cm4") + cm4Obj.SetNamespace("bookstore") + cm4Obj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_configmap_bookstore_test-cm4", + }) + cm4Obj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "root-sync.config-management-system.RootSync", + }) + cm4Obj.Object["data"] = map[string]string{ + "weekday": "monday", + } + + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-cm4.yaml"), cm4Obj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-cm4.yaml")) @@ -368,20 +396,21 @@ data: } /* A new test */ - cm = []byte(` -apiVersion: v1 -kind: Secret -metadata: - name: test-secret - namespace: bookstore - annotations: - configmanagement.gke.io/managed: enabled - configsync.gke.io/resource-id: _configmap_bookstore_test-secret - configsync.gke.io/manager: :root -`) - - if err := os.WriteFile(filepath.Join(nt.TmpDir, "test-secret.yaml"), cm, 0644); err != nil { - nt.T.Fatalf("failed to create a tmp file %v", err) + secretObj := &unstructured.Unstructured{} + secretObj.SetGroupVersionKind(kinds.Secret()) + secretObj.SetName("test-secret") + secretObj.SetNamespace("bookstore") + secretObj.SetAnnotations(map[string]string{ + metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, + metadata.ResourceIDKey: "_configmap_bookstore_test-secret", + metadata.ResourceManagerKey: string(declared.RootScope), + }) + secretObj.SetLabels(map[string]string{ + kubectlapply.ApplysetPartOfLabel: "root-sync.config-management-system.RootSync", + }) + + if err := writeObjectYAMLFile(filepath.Join(nt.TmpDir, "test-secret.yaml"), secretObj, nt.Scheme); err != nil { + nt.T.Fatal(err) } out, err = nt.Shell.Kubectl("apply", "-f", filepath.Join(nt.TmpDir, "test-secret.yaml")) diff --git a/e2e/testcases/multi_sync_test.go b/e2e/testcases/multi_sync_test.go index 362700ad73..5cb8b7a797 100644 --- a/e2e/testcases/multi_sync_test.go +++ b/e2e/testcases/multi_sync_test.go @@ -453,20 +453,20 @@ func TestConflictingDefinitions_NamespaceToRoot(t *testing.T) { } // Validate reconciler error metric is emitted from namespace reconciler. - rootSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN) + repoSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN) if err != nil { nt.T.Fatal(err) } commitHash := nt.NonRootRepos[repoSyncNN].MustHash(nt.T) - err = nomostest.ValidateMetrics(nt, - nomostest.ReconcilerSyncError(nt, rootSyncLabels, commitHash), - nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, commitHash, metrics.ErrorSummary{ - Sync: 1, - })) - if err != nil { - nt.T.Fatal(err) - } + // Reverting a manual change IS considered a "conflict" when the new + // owner is a different reconciler. + nt.Must(nomostest.ValidateMetrics(nt, + nomostest.ReconcilerSyncError(nt, repoSyncLabels, commitHash), + nomostest.ReconcilerErrorMetrics(nt, repoSyncLabels, commitHash, metrics.ErrorSummary{ + Conflicts: 1, // from the remediator + Sync: 1, // remediator conflict error replaces applier conflict error + }))) nt.T.Logf("Ensure the Role matches the one in the Root repo %s", configsync.RootSyncName) err = nt.Validate("pods", testNs, &rbacv1.Role{}, diff --git a/e2e/testcases/preserve_fields_test.go b/e2e/testcases/preserve_fields_test.go index 30c8ca13c7..40081fb83f 100644 --- a/e2e/testcases/preserve_fields_test.go +++ b/e2e/testcases/preserve_fields_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/util/intstr" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "kpt.dev/configsync/e2e/nomostest" "kpt.dev/configsync/e2e/nomostest/metrics" "kpt.dev/configsync/e2e/nomostest/retry" @@ -333,7 +334,11 @@ func TestAddUpdateDeleteLabels(t *testing.T) { nt.T.Fatal(err) } - var defaultLabels = []string{metadata.ManagedByKey, metadata.DeclaredVersionLabel} + var defaultLabels = []string{ + metadata.ManagedByKey, + metadata.DeclaredVersionLabel, + kubectlapply.ApplysetPartOfLabel, + } // Checking that the configmap with no labels appears on cluster, and // that no user labels are specified @@ -350,12 +355,13 @@ func TestAddUpdateDeleteLabels(t *testing.T) { nt.T.Fatal(err) } - // Checking that label is updated after syncing an update. - err = nt.Validate(cmName, ns, &corev1.ConfigMap{}, - testpredicates.HasExactlyLabelKeys(append(defaultLabels, "baz")...)) - if err != nil { - nt.T.Fatal(err) - } + var updatedLabels []string + updatedLabels = append(updatedLabels, defaultLabels...) + updatedLabels = append(updatedLabels, "baz") + + // Checking that label was added after syncing. + nt.Must(nt.Validate(cmName, ns, &corev1.ConfigMap{}, + testpredicates.HasExactlyLabelKeys(updatedLabels...))) delete(cm.Labels, "baz") nt.Must(nt.RootRepos[configsync.RootSyncName].Add(cmPath, cm)) @@ -365,11 +371,8 @@ func TestAddUpdateDeleteLabels(t *testing.T) { } // Check that the label is deleted after syncing. - err = nt.Validate(cmName, ns, &corev1.ConfigMap{}, - testpredicates.HasExactlyLabelKeys(metadata.ManagedByKey, metadata.DeclaredVersionLabel)) - if err != nil { - nt.T.Fatal(err) - } + nt.Must(nt.Validate(cmName, ns, &corev1.ConfigMap{}, + testpredicates.HasExactlyLabelKeys(defaultLabels...))) nt.MetricsExpectations.AddObjectApply(configsync.RootSyncKind, rootSyncNN, nsObj) nt.MetricsExpectations.AddObjectApply(configsync.RootSyncKind, rootSyncNN, cm) diff --git a/pkg/api/configsync/register.go b/pkg/api/configsync/register.go index 7f302643ff..0c18b068f2 100644 --- a/pkg/api/configsync/register.go +++ b/pkg/api/configsync/register.go @@ -14,7 +14,9 @@ package configsync -import "time" +import ( + "time" +) const ( // GroupName is the name of the group of configsync resources. @@ -29,6 +31,14 @@ const ( // ControllerNamespace is the Namespace used for Nomos controllers ControllerNamespace = "config-management-system" + + // ApplySetToolingName is the name used to represent Config Sync in the + // ApplySet tooling annotation. + ApplySetToolingName = GroupName + + // ApplySetToolingVersion is the version used to represent Config Sync in + // the ApplySet tooling annotation. + ApplySetToolingVersion = "v1beta1" ) // API type constants diff --git a/pkg/applier/applier_test.go b/pkg/applier/applier_test.go index 209cc1a465..5694084110 100644 --- a/pkg/applier/applier_test.go +++ b/pkg/applier/applier_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/applier/stats" "kpt.dev/configsync/pkg/core" @@ -89,19 +90,20 @@ func TestApply(t *testing.T) { abandonObj := deploymentObj.DeepCopy() abandonObj.SetName("abandon-me") abandonObj.SetAnnotations(map[string]string{ - common.LifecycleDeleteAnnotation: common.PreventDeletion, + common.LifecycleDeleteAnnotation: common.PreventDeletion, // not removed metadata.ResourceManagementKey: metadata.ResourceManagementEnabled, metadata.ResourceIDKey: core.GKNN(abandonObj), metadata.ResourceManagerKey: resourceManager, metadata.OwningInventoryKey: "anything", metadata.SyncTokenAnnotationKey: "anything", - "example-to-not-delete": "anything", + "example-to-not-delete": "anything", // not removed }) abandonObj.SetLabels(map[string]string{ - metadata.ManagedByKey: metadata.ManagedByValue, - metadata.SystemLabel: "anything", - metadata.ArchLabel: "anything", - "example-to-not-delete": "anything", + metadata.ManagedByKey: metadata.ManagedByValue, + metadata.SystemLabel: "anything", + metadata.ArchLabel: "anything", + kubectlapply.ApplysetPartOfLabel: "anything", // not removed + "example-to-not-delete": "anything", // not removed }) abandonObjID := core.IDOf(abandonObj) @@ -322,15 +324,18 @@ func TestApply(t *testing.T) { expectedServerObjs: []client.Object{ func() client.Object { obj := abandonObj.DeepCopy() - obj.SetAnnotations(map[string]string{ - common.LifecycleDeleteAnnotation: common.PreventDeletion, - // all configsync annotations removed - "example-to-not-delete": "anything", - }) - obj.SetLabels(map[string]string{ - // all configsync labels removed - "example-to-not-delete": "anything", - }) + // all configsync annotations removed + core.RemoveAnnotations(obj, + metadata.ResourceManagementKey, + metadata.ResourceIDKey, + metadata.ResourceManagerKey, + metadata.OwningInventoryKey, + metadata.SyncTokenAnnotationKey) + // all configsync labels removed + core.RemoveLabels(obj, + metadata.ManagedByKey, + metadata.SystemLabel, + metadata.ArchLabel) obj.SetUID("1") obj.SetResourceVersion("2") obj.SetGeneration(1) diff --git a/pkg/applier/clientset.go b/pkg/applier/clientset.go index f46194c055..fe9bca22e2 100644 --- a/pkg/applier/clientset.go +++ b/pkg/applier/clientset.go @@ -19,12 +19,15 @@ import ( "github.com/GoogleContainerTools/kpt/pkg/live" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/klog/v2" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "k8s.io/kubectl/pkg/cmd/util" "sigs.k8s.io/cli-utils/pkg/apply" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/inventory" + "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -53,7 +56,7 @@ type ClientSet struct { } // NewClientSet constructs a new ClientSet. -func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, statusMode string) (*ClientSet, error) { +func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, statusMode, applySetID string) (*ClientSet, error) { matchVersionKubeConfigFlags := util.NewMatchVersionFlags(configFlags) f := util.NewFactory(matchVersionKubeConfigFlags) @@ -71,9 +74,19 @@ func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, s return nil, err } + // Only watch objects applied by this reconciler for status updates. + // This reduces both the number of events processed and the memory used by + // the informer cache. + watchFilters := &watcher.Filters{ + Labels: labels.Set{ + kubectlapply.ApplysetPartOfLabel: applySetID, + }.AsSelector(), + } + applier, err := apply.NewApplierBuilder(). WithInventoryClient(invClient). WithFactory(f). + WithStatusWatcherFilters(watchFilters). Build() if err != nil { return nil, err @@ -82,6 +95,7 @@ func NewClientSet(c client.Client, configFlags *genericclioptions.ConfigFlags, s destroyer, err := apply.NewDestroyerBuilder(). WithInventoryClient(invClient). WithFactory(f). + WithStatusWatcherFilters(watchFilters). Build() if err != nil { return nil, err diff --git a/pkg/core/decorate.go b/pkg/core/decorate.go index 2ff3dc7100..8e27bf6925 100644 --- a/pkg/core/decorate.go +++ b/pkg/core/decorate.go @@ -68,6 +68,15 @@ func RemoveAnnotations(obj client.Object, annotations ...string) { obj.SetAnnotations(as) } +// RemoveLabels removes the passed set of labels from obj. +func RemoveLabels(obj client.Object, labels ...string) { + actual := obj.GetLabels() + for _, label := range labels { + delete(actual, label) + } + obj.SetLabels(actual) +} + // AddAnnotations adds the specified annotations to the object. func AddAnnotations(obj client.Object, annotations map[string]string) { existing := obj.GetAnnotations() diff --git a/pkg/declared/applyset.go b/pkg/declared/applyset.go new file mode 100644 index 0000000000..33aa15fec2 --- /dev/null +++ b/pkg/declared/applyset.go @@ -0,0 +1,50 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package declared + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/cli-runtime/pkg/resource" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" + "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/kinds" +) + +// ApplySetIDForSync generates an ApplySet ID for the RootSync or RepoSync as +// an ApplySet parent. +func ApplySetIDForSync(syncName string, syncScope Scope) string { + return ApplySetForSync(syncName, syncScope, nil, nil).ID() +} + +// ApplySetForSync constructs a new ApplySet for the specified RootSync or +// RepoSync. The RESTMapper & RESTClient are optional, depending on which +// methods you plan to call. +func ApplySetForSync(syncName string, syncScope Scope, mapper meta.RESTMapper, client resource.RESTClient) *kubectlapply.ApplySet { + tooling := kubectlapply.ApplySetTooling{ + Name: configsync.ApplySetToolingName, + Version: configsync.ApplySetToolingVersion, + } + parent := &kubectlapply.ApplySetParentRef{ + Name: syncName, + Namespace: syncScope.SyncNamespace(), + } + switch syncScope { + case RootScope: + parent.RESTMapping = kinds.RootSyncRESTMapping() + default: + parent.RESTMapping = kinds.RepoSyncRESTMapping() + } + return kubectlapply.NewApplySet(parent, tooling, mapper, client) +} diff --git a/pkg/kinds/resources.go b/pkg/kinds/resources.go index d2ede2d237..e0d3e3e16b 100644 --- a/pkg/kinds/resources.go +++ b/pkg/kinds/resources.go @@ -16,6 +16,7 @@ package kinds import ( appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" configsyncv1beta1 "kpt.dev/configsync/pkg/api/configsync/v1beta1" ) @@ -34,3 +35,21 @@ func RootSyncResource() schema.GroupVersionResource { func RepoSyncResource() schema.GroupVersionResource { return configsyncv1beta1.SchemeGroupVersion.WithResource("reposyncs") } + +// RootSyncRESTMapping returns the canonical RootSync RESTMapping. +func RootSyncRESTMapping() *meta.RESTMapping { + return &meta.RESTMapping{ + Resource: RootSyncResource(), + GroupVersionKind: RootSyncV1Beta1(), + Scope: meta.RESTScopeNamespace, + } +} + +// RepoSyncRESTMapping returns the canonical RepoSync RESTMapping. +func RepoSyncRESTMapping() *meta.RESTMapping { + return &meta.RESTMapping{ + Resource: RepoSyncResource(), + GroupVersionKind: RepoSyncV1Beta1(), + Scope: meta.RESTScopeNamespace, + } +} diff --git a/pkg/metadata/metadata_test.go b/pkg/metadata/metadata_test.go index 667f730de1..5bbf385dcc 100644 --- a/pkg/metadata/metadata_test.go +++ b/pkg/metadata/metadata_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "kpt.dev/configsync/pkg/api/configmanagement" "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/kinds" @@ -105,6 +106,12 @@ func TestHasConfigSyncMetadata(t *testing.T) { core.Label(metadata.ManagedByKey, "random-value")), want: false, }, + { + name: "An object with the `applyset.kubernetes.io/part-of` label", + obj: fake.UnstructuredObject(kinds.Deployment(), core.Name("deploy"), + core.Label(kubectlapply.ApplysetPartOfLabel, "random-value")), + want: false, + }, } for _, tc := range testcases { diff --git a/pkg/parse/annotations.go b/pkg/parse/annotations.go index a0865011f8..b83a1b3b93 100644 --- a/pkg/parse/annotations.go +++ b/pkg/parse/annotations.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/declared" @@ -37,10 +38,12 @@ func addAnnotationsAndLabels(objs []ast.FileObject, scope declared.Scope, syncNa if err != nil { return fmt.Errorf("marshaling sourceContext: %w", err) } + applySetID := declared.ApplySetIDForSync(syncName, scope) inventoryID := applier.InventoryID(syncName, scope.SyncNamespace()) manager := declared.ResourceManager(scope, syncName) for _, obj := range objs { core.SetLabel(obj, metadata.ManagedByKey, metadata.ManagedByValue) + core.SetLabel(obj, kubectlapply.ApplysetPartOfLabel, applySetID) core.SetAnnotation(obj, metadata.GitContextKey, string(gcVal)) core.SetAnnotation(obj, metadata.ResourceManagerKey, manager) core.SetAnnotation(obj, metadata.SyncTokenAnnotationKey, commitHash) diff --git a/pkg/parse/annotations_test.go b/pkg/parse/annotations_test.go index fc92985267..5db2551265 100644 --- a/pkg/parse/annotations_test.go +++ b/pkg/parse/annotations_test.go @@ -18,14 +18,21 @@ import ( "testing" "github.com/google/go-cmp/cmp" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/core" + "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/importer/analyzer/ast" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/testing/fake" ) func TestAddAnnotationsAndLabels(t *testing.T) { + syncName := "rs" + syncNamespace := "some-namespace" + syncScope := declared.ScopeFromSyncNamespace(syncNamespace) + applySetID := declared.ApplySetIDForSync(syncName, syncScope) + testcases := []struct { name string actual []ast.FileObject @@ -50,11 +57,12 @@ func TestAddAnnotationsAndLabels(t *testing.T) { expected: []ast.FileObject{fake.Role( core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.ResourceManagementKey, "enabled"), core.Annotation(metadata.ResourceManagerKey, "some-namespace_rs"), core.Annotation(metadata.SyncTokenAnnotationKey, "1234567"), core.Annotation(metadata.GitContextKey, `{"repo":"git@github.com/foo","branch":"main","rev":"HEAD"}`), - core.Annotation(metadata.OwningInventoryKey, applier.InventoryID("rs", "some-namespace")), + core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(syncName, syncNamespace)), core.Annotation(metadata.ResourceIDKey, "rbac.authorization.k8s.io_role_foo_default-name"), )}, }, @@ -62,7 +70,7 @@ func TestAddAnnotationsAndLabels(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - if err := addAnnotationsAndLabels(tc.actual, "some-namespace", "rs", tc.gc, tc.commitHash); err != nil { + if err := addAnnotationsAndLabels(tc.actual, syncScope, syncName, tc.gc, tc.commitHash); err != nil { t.Fatalf("Failed to add annotations and labels: %v", err) } if diff := cmp.Diff(tc.expected, tc.actual, ast.CompareFileObject); diff != "" { diff --git a/pkg/parse/root_test.go b/pkg/parse/root_test.go index 95b54a51b9..a082669776 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_test.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "k8s.io/utils/clock" fakeclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" @@ -69,6 +70,7 @@ const ( rootReconcilerName = "root-reconciler-my-rs" nilGitContext = `{"repo":""}` testGitCommit = "example-commit" + otherRootSyncName = "other-root-sync" ) func gitSpec(repo string, auth configsync.AuthType) core.MetaMutator { @@ -87,6 +89,9 @@ func TestRoot_Parse(t *testing.T) { fakeTime := fakeMetaTime.Time fakeClock := fakeclock.NewFakeClock(fakeTime) + applySetID := declared.ApplySetIDForSync(rootSyncName, declared.RootScope) + otherRootSyncApplySetID := declared.ApplySetIDForSync(otherRootSyncName, declared.RootScope) + // TODO: Test different source types and inputs fileSource := FileSource{ SourceType: configsync.GitSource, @@ -262,6 +267,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -274,6 +280,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -303,6 +310,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -321,6 +329,7 @@ func TestRoot_Parse(t *testing.T) { existingObjects: []client.Object{ fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -341,6 +350,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -353,6 +363,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -370,13 +381,14 @@ func TestRoot_Parse(t *testing.T) { namespaceStrategy: configsync.NamespaceStrategyImplicit, existingObjects: []client.Object{fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, otherRootSyncApplySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), core.Annotation(metadata.SyncTokenAnnotationKey, testGitCommit), - core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(rootSyncName, configmanagement.ControllerNamespace)), + core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(otherRootSyncName, configmanagement.ControllerNamespace)), core.Annotation(metadata.ResourceIDKey, "_namespace_foo"), - difftest.ManagedBy(declared.RootScope, "other-root-sync")), + difftest.ManagedBy(declared.RootScope, otherRootSyncName)), rootSyncInput.DeepCopy(), }, parseOutputs: []fsfake.ParserOutputs{ @@ -390,6 +402,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -420,6 +433,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -450,6 +464,7 @@ func TestRoot_Parse(t *testing.T) { fake.WithRootSyncSourceType(configsync.GitSource), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1beta1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, fmt.Sprintf("namespaces/%s/test.yaml", configsync.ControllerNamespace)), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -480,6 +495,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -491,6 +507,7 @@ func TestRoot_Parse(t *testing.T) { fake.ConfigMap(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/configmap.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -503,6 +520,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -523,6 +541,7 @@ func TestRoot_Parse(t *testing.T) { // bar not exists, should be added as an implicit namespace fake.NamespaceObject("baz", // baz exists and self-managed, should be added as an implicit namespace core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -547,6 +566,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -558,6 +578,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -569,6 +590,7 @@ func TestRoot_Parse(t *testing.T) { fake.ConfigMap(core.Namespace("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/configmap.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -580,6 +602,7 @@ func TestRoot_Parse(t *testing.T) { fake.Role(core.Namespace("baz"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -591,6 +614,7 @@ func TestRoot_Parse(t *testing.T) { fake.ConfigMap(core.Namespace("baz"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/configmap.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -603,6 +627,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("bar"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -615,6 +640,7 @@ func TestRoot_Parse(t *testing.T) { "", core.Name("baz"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, gitContextOutput), @@ -729,6 +755,8 @@ func TestRoot_Parse(t *testing.T) { } func TestRoot_DeclaredFields(t *testing.T) { + applySetID := declared.ApplySetIDForSync(rootSyncName, declared.RootScope) + testCases := []struct { name string webhookEnabled bool @@ -741,13 +769,14 @@ func TestRoot_DeclaredFields(t *testing.T) { webhookEnabled: false, existingObjects: []client.Object{fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{},"f:labels":{}},"f:rules":{}}`), core.Annotation(metadata.GitContextKey, nilGitContext), core.Annotation(metadata.SyncTokenAnnotationKey, ""), core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(rootSyncName, configmanagement.ControllerNamespace)), core.Annotation(metadata.ResourceIDKey, "_namespace_foo"), - difftest.ManagedBy(declared.RootScope, "other-root-sync"))}, + difftest.ManagedBy(declared.RootScope, otherRootSyncName))}, parsed: []ast.FileObject{ fake.Role(core.Namespace("foo")), }, @@ -755,6 +784,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -770,13 +800,14 @@ func TestRoot_DeclaredFields(t *testing.T) { webhookEnabled: true, existingObjects: []client.Object{fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{},"f:labels":{}},"f:rules":{}}`), core.Annotation(metadata.GitContextKey, nilGitContext), core.Annotation(metadata.SyncTokenAnnotationKey, ""), core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(rootSyncName, configmanagement.ControllerNamespace)), core.Annotation(metadata.ResourceIDKey, "_namespace_foo"), - difftest.ManagedBy(declared.RootScope, "other-root-sync")), + difftest.ManagedBy(declared.RootScope, otherRootSyncName)), }, parsed: []ast.FileObject{ fake.Role(core.Namespace("foo")), @@ -785,6 +816,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{"f:configmanagement.gke.io/source-path":{}},"f:labels":{"f:configsync.gke.io/declared-version":{}}},"f:rules":{}}`), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), @@ -802,12 +834,13 @@ func TestRoot_DeclaredFields(t *testing.T) { existingObjects: []client.Object{ fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), core.Annotation(metadata.SyncTokenAnnotationKey, ""), core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(rootSyncName, configmanagement.ControllerNamespace)), core.Annotation(metadata.ResourceIDKey, "_namespace_foo"), - difftest.ManagedBy(declared.RootScope, "other-root-sync")), + difftest.ManagedBy(declared.RootScope, otherRootSyncName)), fake.AdmissionWebhookObject(webhookconfiguration.Name), }, parsed: []ast.FileObject{ @@ -817,6 +850,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.DeclaredFieldsKey, `{"f:metadata":{"f:annotations":{"f:configmanagement.gke.io/source-path":{}},"f:labels":{"f:configsync.gke.io/declared-version":{}}},"f:rules":{}}`), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), @@ -834,12 +868,13 @@ func TestRoot_DeclaredFields(t *testing.T) { existingObjects: []client.Object{ fake.NamespaceObject("foo", core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), core.Annotation(metadata.SyncTokenAnnotationKey, ""), core.Annotation(metadata.OwningInventoryKey, applier.InventoryID(rootSyncName, configmanagement.ControllerNamespace)), core.Annotation(metadata.ResourceIDKey, "_namespace_foo"), - difftest.ManagedBy(declared.RootScope, "other-root-sync")), + difftest.ManagedBy(declared.RootScope, otherRootSyncName)), }, parsed: []ast.FileObject{ fake.Role(core.Namespace("foo")), @@ -848,6 +883,7 @@ func TestRoot_DeclaredFields(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -982,6 +1018,8 @@ func fakeParseError(err error, gvks ...schema.GroupVersionKind) status.MultiErro } func TestRoot_Parse_Discovery(t *testing.T) { + applySetID := declared.ApplySetIDForSync(rootSyncName, declared.RootScope) + testCases := []struct { name string parsed []ast.FileObject @@ -1038,6 +1076,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1050,6 +1089,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1074,6 +1114,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { fakeCRD(core.Name("anvils.acme.com"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "cluster/crd.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1084,6 +1125,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { fake.Role(core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/foo/role.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), @@ -1097,6 +1139,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { core.Namespace("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), core.Label(metadata.DeclaredVersionLabel, "v1"), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(metadata.ResourceManagerKey, ":root_my-rs"), core.Annotation(metadata.SourcePathAnnotationKey, "namespaces/obj.yaml"), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), @@ -1109,6 +1152,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { "", core.Name("foo"), core.Label(metadata.ManagedByKey, metadata.ManagedByValue), + core.Label(kubectlapply.ApplysetPartOfLabel, applySetID), core.Annotation(common.LifecycleDeleteAnnotation, common.PreventDeletion), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementEnabled), core.Annotation(metadata.GitContextKey, nilGitContext), diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 6e6d55c0b5..9a1b5be868 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -195,7 +195,8 @@ func Run(opts Options) { if reconcileTimeout < 0 { klog.Fatalf("Invalid reconcileTimeout: %v, timeout should not be negative", reconcileTimeout) } - clientSet, err := applier.NewClientSet(cl, configFlags, opts.StatusMode) + applySetID := declared.ApplySetIDForSync(opts.SyncName, opts.ReconcilerScope) + clientSet, err := applier.NewClientSet(cl, configFlags, opts.StatusMode, applySetID) if err != nil { klog.Fatalf("Error creating clients: %v", err) } diff --git a/pkg/reconcilermanager/controllers/reconciler_base.go b/pkg/reconcilermanager/controllers/reconciler_base.go index b117bf4d54..f41dfeaf39 100644 --- a/pkg/reconcilermanager/controllers/reconciler_base.go +++ b/pkg/reconcilermanager/controllers/reconciler_base.go @@ -34,11 +34,13 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/dynamic" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "k8s.io/utils/ptr" "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" hubv1 "kpt.dev/configsync/pkg/api/hub/v1" "kpt.dev/configsync/pkg/core" + "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/reconcilermanager" @@ -711,3 +713,30 @@ func (r *reconcilerBase) isWebhookEnabled(ctx context.Context) (bool, error) { } return err == nil, err } + +func (r *reconcilerBase) patchSyncMetadata(ctx context.Context, rs client.Object) (bool, error) { + syncScope := declared.ScopeFromSyncNamespace(rs.GetNamespace()) + applySetID := declared.ApplySetIDForSync(rs.GetName(), syncScope) + + currentApplySetID := core.GetLabel(rs, kubectlapply.ApplySetParentIDLabel) + if currentApplySetID == applySetID { + r.logger(ctx).V(5).Info("Sync metadata update skipped: no change") + return false, nil + } + + if r.logger(ctx).V(5).Enabled() { + r.logger(ctx).Info("Updating sync metadata: package ID label", + logFieldResourceVersion, rs.GetResourceVersion(), + "labelKey", kubectlapply.ApplySetParentIDLabel, + "labelValueOld", currentApplySetID, "labelValue", applySetID) + } + + patch := fmt.Sprintf(`{"metadata": {"labels": {%q: %q}}}`, kubectlapply.ApplySetParentIDLabel, applySetID) + err := r.client.Patch(ctx, rs, client.RawPatch(types.MergePatchType, []byte(patch)), + client.FieldOwner(reconcilermanager.FieldManager)) + if err != nil { + return true, fmt.Errorf("Sync metadata update failed: %w", err) + } + r.logger(ctx).Info("Sync metadata update successful") + return true, nil +} diff --git a/pkg/reconcilermanager/controllers/reposync_controller.go b/pkg/reconcilermanager/controllers/reposync_controller.go index 03151565f4..bf2f7da405 100644 --- a/pkg/reconcilermanager/controllers/reposync_controller.go +++ b/pkg/reconcilermanager/controllers/reposync_controller.go @@ -320,11 +320,15 @@ func (r *RepoSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile } // setup performs the following steps: +// - Patch RepoSync to upsert package-id label // - Create or update managed objects // - Convert any error into RepoSync status conditions // - Update the RepoSync status func (r *RepoSyncReconciler) setup(ctx context.Context, reconcilerRef types.NamespacedName, rs *v1beta1.RepoSync) error { - err := r.upsertManagedObjects(ctx, reconcilerRef, rs) + _, err := r.patchSyncMetadata(ctx, rs) + if err == nil { + err = r.upsertManagedObjects(ctx, reconcilerRef, rs) + } updated, updateErr := r.updateSyncStatus(ctx, rs, reconcilerRef, func(syncObj *v1beta1.RepoSync) error { // Modify the sync status, // but keep the upsert error separate from the status update error. diff --git a/pkg/reconcilermanager/controllers/rootsync_controller.go b/pkg/reconcilermanager/controllers/rootsync_controller.go index 769a0c40b0..10d3a7b700 100644 --- a/pkg/reconcilermanager/controllers/rootsync_controller.go +++ b/pkg/reconcilermanager/controllers/rootsync_controller.go @@ -269,11 +269,15 @@ func (r *RootSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile } // setup performs the following steps: +// - Patch RootSync to upsert package-id label // - Create or update managed objects // - Convert any error into RootSync status conditions // - Update the RootSync status func (r *RootSyncReconciler) setup(ctx context.Context, reconcilerRef types.NamespacedName, rs *v1beta1.RootSync) error { - err := r.upsertManagedObjects(ctx, reconcilerRef, rs) + _, err := r.patchSyncMetadata(ctx, rs) + if err == nil { + err = r.upsertManagedObjects(ctx, reconcilerRef, rs) + } updated, updateErr := r.updateSyncStatus(ctx, rs, reconcilerRef, func(syncObj *v1beta1.RootSync) error { // Modify the sync status, // but keep the upsert error separate from the status update error. diff --git a/pkg/remediator/watch/filteredwatcher_test.go b/pkg/remediator/watch/filteredwatcher_test.go index 4a36927237..f821ed87cd 100644 --- a/pkg/remediator/watch/filteredwatcher_test.go +++ b/pkg/remediator/watch/filteredwatcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/ptr" @@ -346,6 +347,7 @@ func TestFilteredWatcher(t *testing.T) { return <-watches, nil }, conflictHandler: testfake.NewConflictHandler(), + labelSelector: labels.Everything(), } w := NewFiltered(cfg) diff --git a/pkg/remediator/watch/manager.go b/pkg/remediator/watch/manager.go index 6c126672f0..6cd17c6326 100644 --- a/pkg/remediator/watch/manager.go +++ b/pkg/remediator/watch/manager.go @@ -19,9 +19,11 @@ import ( "errors" "sync" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" "k8s.io/klog/v2" + kubectlapply "k8s.io/kubectl/pkg/cmd/apply" "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/queue" @@ -49,6 +51,9 @@ type Manager struct { // watcherFactory is the function to create a watcher. watcherFactory watcherFactory + // labelSelector filters watches + labelSelector labels.Selector + // The following fields are guarded by the mutex. mux sync.RWMutex // watching is true if UpdateWatches has been called @@ -89,6 +94,12 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, } } + // Only watch & remediate objects applied by this reconciler + applySetID := declared.ApplySetIDForSync(syncName, scope) + labelSelector := labels.Set{ + kubectlapply.ApplysetPartOfLabel: applySetID, + }.AsSelector() + return &Manager{ scope: scope, syncName: syncName, @@ -96,6 +107,7 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, resources: decls, watcherMap: make(map[schema.GroupVersionKind]Runnable), watcherFactory: options.watcherFactory, + labelSelector: labelSelector, queue: q, conflictHandler: ch, }, nil @@ -206,6 +218,7 @@ func (m *Manager) startWatcher(ctx context.Context, gvk schema.GroupVersionKind) scope: m.scope, syncName: m.syncName, conflictHandler: m.conflictHandler, + labelSelector: m.labelSelector, } w, err := m.watcherFactory(cfg) if err != nil { diff --git a/pkg/remediator/watch/manager_test.go b/pkg/remediator/watch/manager_test.go index b3a5366311..4c9741cdc0 100644 --- a/pkg/remediator/watch/manager_test.go +++ b/pkg/remediator/watch/manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "kpt.dev/configsync/pkg/declared" @@ -38,6 +39,7 @@ func fakeRunnable() Runnable { return watch.NewFake(), nil }, conflictHandler: fake.NewConflictHandler(), + labelSelector: labels.Everything(), } return NewFiltered(cfg) } diff --git a/pkg/remediator/watch/watcher.go b/pkg/remediator/watch/watcher.go index 310c7d8fb6..754f5572a4 100644 --- a/pkg/remediator/watch/watcher.go +++ b/pkg/remediator/watch/watcher.go @@ -18,6 +18,7 @@ import ( "context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" @@ -38,6 +39,7 @@ type watcherConfig struct { syncName string startWatch WatchFunc conflictHandler conflict.Handler + labelSelector labels.Selector } // watcherFactory knows how to build watch.Runnables. @@ -56,6 +58,9 @@ func watcherFactoryFromListerWatcherFactory(factory ListerWatcherFactory) watche namespace = string(cfg.scope) } lw := factoryPtr(cfg.gvk, namespace) + if cfg.labelSelector != nil { + options.LabelSelector = cfg.labelSelector.String() + } return ListAndWatch(ctx, lw, options) } }