diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 2904c10c38b..0d1adfe2570 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -382,10 +382,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister()) sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer()) - labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error { + complete := map[schema.GroupVersionResource]bool{} + completeLock := &sync.RWMutex{} + + labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error { if canFilter { return nil } + complete[gvr] = false op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: gvr.String(), }) @@ -393,7 +397,16 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo ctx, queueinformer.WithLogger(op.logger), queueinformer.WithInformer(informer), - queueinformer.WithSyncer(sync.ToSyncer()), + queueinformer.WithSyncer(sync(func() bool { + completeLock.Lock() + complete[gvr] = true + allDone := true + for _, done := range complete { + allDone = allDone && done + } + completeLock.Unlock() + return allDone + }).ToSyncer()), ) if err != nil { return err @@ -409,6 +422,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles") if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration]( ctx, op.logger, labeller.Filter(rolesgvk), + roleInformer.Lister().List, rbacv1applyconfigurations.Role, func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) { return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts) @@ -425,6 +439,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings") if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration]( ctx, op.logger, labeller.Filter(rolebindingsgvk), + roleBindingInformer.Lister().List, rbacv1applyconfigurations.RoleBinding, func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) { return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts) @@ -441,6 +456,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts") if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration]( ctx, op.logger, labeller.Filter(serviceaccountsgvk), + serviceAccountInformer.Lister().List, corev1applyconfigurations.ServiceAccount, func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) { return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts) @@ -457,6 +473,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo servicesgvk := corev1.SchemeGroupVersion.WithResource("services") if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration]( ctx, op.logger, labeller.Filter(servicesgvk), + serviceInformer.Lister().List, corev1applyconfigurations.Service, func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) { return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts) @@ -482,6 +499,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo podsgvk := corev1.SchemeGroupVersion.WithResource("pods") if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration]( ctx, op.logger, labeller.Filter(podsgvk), + csPodInformer.Lister().List, corev1applyconfigurations.Pod, func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) { return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts) @@ -519,6 +537,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) { return configMapInformer.Lister().ConfigMaps(namespace).Get(name) }), + jobInformer.Lister().List, batchv1applyconfigurations.Job, func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) { return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts) @@ -594,6 +613,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions") if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler( ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk), + crdLister.List, op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch, )); err != nil { return nil, err diff --git a/pkg/controller/operators/labeller/labels.go b/pkg/controller/operators/labeller/labels.go index 8c2af67c9d1..b6de0af2b3c 100644 --- a/pkg/controller/operators/labeller/labels.go +++ b/pkg/controller/operators/labeller/labels.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -40,28 +41,49 @@ func ObjectLabeler[T metav1.Object, A ApplyConfig[A]]( ctx context.Context, logger *logrus.Logger, check func(metav1.Object) bool, + list func(options labels.Selector) ([]T, error), applyConfigFor func(name, namespace string) A, apply func(namespace string, ctx context.Context, cfg A, opts metav1.ApplyOptions) (T, error), -) queueinformer.LegacySyncHandler { - return func(obj interface{}) error { - cast, ok := obj.(T) - if !ok { - err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj) - logger.WithError(err).Error("casting failed") - return fmt.Errorf("casting failed: %w", err) - } +) func(done func() bool) queueinformer.LegacySyncHandler { + return func(done func() bool) queueinformer.LegacySyncHandler { + return func(obj interface{}) error { + cast, ok := obj.(T) + if !ok { + err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj) + logger.WithError(err).Error("casting failed") + return fmt.Errorf("casting failed: %w", err) + } - if !check(cast) || hasLabel(cast) { - return nil - } + if !check(cast) || hasLabel(cast) { + // if the object we're processing does not need us to label it, it's possible that every object that requires + // the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter + // the informers used to drive the controller and stop having to track extraneous objects + items, err := list(labels.Everything()) + if err != nil { + logger.WithError(err).Warn("failed to list all objects to check for labelling completion") + return nil + } + gvrFullyLabelled := true + for _, item := range items { + gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item)) + } + if gvrFullyLabelled { + allObjectsLabelled := done() + if allObjectsLabelled { + logrus.Fatal("detected that every object is labelled, exiting...") + } + } + return nil + } - cfg := applyConfigFor(cast.GetName(), cast.GetNamespace()) - cfg.WithLabels(map[string]string{ - install.OLMManagedLabelKey: install.OLMManagedLabelValue, - }) + cfg := applyConfigFor(cast.GetName(), cast.GetNamespace()) + cfg.WithLabels(map[string]string{ + install.OLMManagedLabelKey: install.OLMManagedLabelValue, + }) - _, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{}) - return err + _, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{}) + return err + } } } @@ -71,58 +93,77 @@ func ObjectPatchLabeler( ctx context.Context, logger *logrus.Logger, check func(metav1.Object) bool, + list func(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error), patch func(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *apiextensionsv1.CustomResourceDefinition, err error), -) func( - obj interface{}, -) error { - return func(obj interface{}) error { - cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition) - if !ok { - err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj) - logger.WithError(err).Error("casting failed") - return fmt.Errorf("casting failed: %w", err) - } +) func(done func() bool) queueinformer.LegacySyncHandler { + return func(done func() bool) queueinformer.LegacySyncHandler { + return func(obj interface{}) error { + cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition) + if !ok { + err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj) + logger.WithError(err).Error("casting failed") + return fmt.Errorf("casting failed: %w", err) + } - if !check(cast) || hasLabel(cast) { - return nil - } + if !check(cast) || hasLabel(cast) { + // if the object we're processing does not need us to label it, it's possible that every object that requires + // the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter + // the informers used to drive the controller and stop having to track extraneous objects + items, err := list(labels.Everything()) + if err != nil { + logger.WithError(err).Warn("failed to list all objects to check for labelling completion") + return nil + } + gvrFullyLabelled := true + for _, item := range items { + gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item)) + } + if gvrFullyLabelled { + allObjectsLabelled := done() + if allObjectsLabelled { + logrus.Fatal("detected that every object is labelled, exiting...") + } + } + return nil + } - uid := cast.GetUID() - rv := cast.GetResourceVersion() + uid := cast.GetUID() + rv := cast.GetResourceVersion() - // to ensure they appear in the patch as preconditions - previous := cast.DeepCopy() - previous.SetUID("") - previous.SetResourceVersion("") + // to ensure they appear in the patch as preconditions + previous := cast.DeepCopy() + previous.SetUID("") + previous.SetResourceVersion("") - oldData, err := json.Marshal(previous) - if err != nil { - return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err) - } + oldData, err := json.Marshal(previous) + if err != nil { + return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err) + } - // to ensure they appear in the patch as preconditions - updated := cast.DeepCopy() - updated.SetUID(uid) - updated.SetResourceVersion(rv) - labels := updated.GetLabels() - if labels == nil { - labels = map[string]string{} - } - labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue - updated.SetLabels(labels) + // to ensure they appear in the patch as preconditions + updated := cast.DeepCopy() + updated.SetUID(uid) + updated.SetResourceVersion(rv) + labels := updated.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue + updated.SetLabels(labels) - newData, err := json.Marshal(updated) - if err != nil { - return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err) - } + newData, err := json.Marshal(updated) + if err != nil { + return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err) + } - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err) - } + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err) + } - _, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{}) - return err + _, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{}) + return err + } } } diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index f7afa1ed6df..b9acd69288a 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" @@ -449,10 +450,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat } } - labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error { + complete := map[schema.GroupVersionResource]bool{} + completeLock := &sync.RWMutex{} + + labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error { if canFilter { return nil } + complete[gvr] = false op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: gvr.String(), }) @@ -460,7 +465,16 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ctx, queueinformer.WithLogger(op.logger), queueinformer.WithInformer(informer), - queueinformer.WithSyncer(sync.ToSyncer()), + queueinformer.WithSyncer(sync(func() bool { + completeLock.Lock() + complete[gvr] = true + allDone := true + for _, done := range complete { + allDone = allDone && done + } + completeLock.Unlock() + return allDone + }).ToSyncer()), ) if err != nil { return err @@ -476,6 +490,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat deploymentsgvk := appsv1.SchemeGroupVersion.WithResource("deployments") if err := labelObjects(deploymentsgvk, informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration]( ctx, op.logger, labeller.Filter(deploymentsgvk), + informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Lister().List, appsv1applyconfigurations.Deployment, func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) { return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts) @@ -548,6 +563,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat clusterrolesgvk := rbacv1.SchemeGroupVersion.WithResource("clusterroles") if err := labelObjects(clusterrolesgvk, clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration]( ctx, op.logger, labeller.Filter(clusterrolesgvk), + clusterRoleInformer.Lister().List, func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration { return rbacv1applyconfigurations.ClusterRole(name) }, @@ -577,6 +593,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat clusterrolebindingssgvk := rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings") if err := labelObjects(clusterrolebindingssgvk, clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration]( ctx, op.logger, labeller.Filter(clusterrolebindingssgvk), + clusterRoleBindingInformer.Lister().List, func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration { return rbacv1applyconfigurations.ClusterRoleBinding(name) },