diff --git a/pkg/controller/registry/reconciler/configmap.go b/pkg/controller/registry/reconciler/configmap.go index 2e47c56507..f9a63162a8 100644 --- a/pkg/controller/registry/reconciler/configmap.go +++ b/pkg/controller/registry/reconciler/configmap.go @@ -3,11 +3,12 @@ package reconciler import ( "context" + "errors" "fmt" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install" hashutil "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/hash" - "github.com/pkg/errors" + pkgerrors "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -15,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" @@ -327,27 +329,27 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, //TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated) if err := c.ensureServiceAccount(source, overwrite); err != nil { - return errors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName()) + return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName()) } if err := c.ensureRole(source, overwrite); err != nil { - return errors.Wrapf(err, "error ensuring role: %s", source.roleName()) + return pkgerrors.Wrapf(err, "error ensuring role: %s", source.roleName()) } if err := c.ensureRoleBinding(source, overwrite); err != nil { - return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName()) + return pkgerrors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName()) } pod, err := source.Pod(image, defaultPodSecurityConfig) if err != nil { return err } if err := c.ensurePod(source, defaultPodSecurityConfig, overwritePod); err != nil { - return errors.Wrapf(err, "error ensuring pod: %s", pod.GetName()) + return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName()) } service, err := source.Service() if err != nil { return err } if err := c.ensureService(source, overwrite); err != nil { - return errors.Wrapf(err, "error ensuring service: %s", service.GetName()) + return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName()) } if overwritePod { @@ -420,7 +422,7 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec } for _, p := range currentPods { if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) { - return errors.Wrapf(err, "error deleting old pod: %s", p.GetName()) + return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName()) } } } @@ -428,7 +430,7 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec if err == nil { return nil } - return errors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName()) + return pkgerrors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName()) } func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourceDecorator, overwrite bool) error { @@ -512,6 +514,34 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, return } - healthy = true - return + podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, pods, source.GetNamespace()) + if e != nil { + return false, fmt.Errorf("error deleting dead pods: %v", e) + } + return podsAreLive, nil +} + +// detectAndDeleteDeadPods determines if there are registry client pods that are in the deleted state +// but have not been removed by GC (eg the node goes down before GC can remove them), and attempts to +// force delete the pods. If there are live registry pods remaining, it returns true, otherwise returns false. +func detectAndDeleteDeadPods(logger *logrus.Entry, client operatorclient.ClientInterface, pods []*corev1.Pod, sourceNamespace string) (bool, error) { + var forceDeletionErrs []error + livePodFound := false + for _, pod := range pods { + if !isPodDead(pod) { + livePodFound = true + logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Debug("pod is alive") + continue + } + logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Info("force deleting dead pod") + if err := client.KubernetesInterface().CoreV1().Pods(sourceNamespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{ + GracePeriodSeconds: ptr.To[int64](0), + }); err != nil && !apierrors.IsNotFound(err) { + forceDeletionErrs = append(forceDeletionErrs, err) + } + } + if len(forceDeletionErrs) > 0 { + return false, errors.Join(forceDeletionErrs...) + } + return livePodFound, nil } diff --git a/pkg/controller/registry/reconciler/configmap_test.go b/pkg/controller/registry/reconciler/configmap_test.go index 9635a5b59b..a8c1dcb9d8 100644 --- a/pkg/controller/registry/reconciler/configmap_test.go +++ b/pkg/controller/registry/reconciler/configmap_test.go @@ -527,3 +527,55 @@ func TestConfigMapRegistryReconciler(t *testing.T) { }) } } + +func TestConfigMapRegistryChecker(t *testing.T) { + validConfigMap := validConfigMap() + validCatalogSource := validConfigMapCatalogSource(validConfigMap) + type cluster struct { + k8sObjs []runtime.Object + } + type in struct { + cluster cluster + catsrc *v1alpha1.CatalogSource + } + type out struct { + healthy bool + err error + } + tests := []struct { + testName string + in in + out out + }{ + { + testName: "ConfigMap/ExistingRegistry/DeadPod", + in: in{ + cluster: cluster{ + k8sObjs: append(withPodDeletedButNotRemoved(objectsForCatalogSource(t, validCatalogSource)), validConfigMap), + }, + catsrc: validCatalogSource, + }, + out: out{ + healthy: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.testName, func(t *testing.T) { + stopc := make(chan struct{}) + defer close(stopc) + + factory, _ := fakeReconcilerFactory(t, stopc, withK8sObjs(tt.in.cluster.k8sObjs...)) + rec := factory.ReconcilerForSource(tt.in.catsrc) + + healthy, err := rec.CheckRegistryServer(logrus.NewEntry(logrus.New()), tt.in.catsrc) + + require.Equal(t, tt.out.err, err) + if tt.out.err != nil { + return + } + + require.Equal(t, tt.out.healthy, healthy) + }) + } +} diff --git a/pkg/controller/registry/reconciler/grpc.go b/pkg/controller/registry/reconciler/grpc.go index 30cf12f1d2..ec0bdad10c 100644 --- a/pkg/controller/registry/reconciler/grpc.go +++ b/pkg/controller/registry/reconciler/grpc.go @@ -2,9 +2,7 @@ package reconciler import ( "context" - "errors" "fmt" - "slices" "strings" "time" @@ -24,7 +22,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/utils/ptr" ) const ( @@ -348,25 +345,6 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig, overwrite bool) error { // currentPods refers to the current pod instances of the catalog source currentPods := c.currentPods(logger, source) - - var forceDeleteErrs []error - currentPods = slices.DeleteFunc(currentPods, func(pod *corev1.Pod) bool { - if !isPodDead(pod) { - logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Debug("pod is alive") - return false - } - logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Info("force deleting dead pod") - if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{ - GracePeriodSeconds: ptr.To[int64](0), - }); err != nil && !apierrors.IsNotFound(err) { - forceDeleteErrs = append(forceDeleteErrs, pkgerrors.Wrapf(err, "error deleting old pod: %s", pod.GetName())) - } - return true - }) - if len(forceDeleteErrs) > 0 { - return errors.Join(forceDeleteErrs...) - } - if len(currentPods) > 0 { if !overwrite { return nil @@ -628,16 +606,19 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal if err != nil { return false, err } - current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig) + currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig) if err != nil { return false, err } - if len(current) < 1 || + if len(currentPods) < 1 || service == nil || c.currentServiceAccount(source) == nil { return false, nil } - - return true, nil + podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace()) + if e != nil { + return false, fmt.Errorf("error deleting dead pods: %v", e) + } + return podsAreLive, nil } // promoteCatalog swaps the labels on the update pod so that the update pod is now reachable by the catalog service. diff --git a/pkg/controller/registry/reconciler/grpc_test.go b/pkg/controller/registry/reconciler/grpc_test.go index 65071feda5..37bd73aace 100644 --- a/pkg/controller/registry/reconciler/grpc_test.go +++ b/pkg/controller/registry/reconciler/grpc_test.go @@ -70,6 +70,23 @@ func grpcCatalogSourceWithName(name string) *v1alpha1.CatalogSource { return catsrc } +func withPodDeletedButNotRemoved(objs []runtime.Object) []runtime.Object { + var out []runtime.Object + for _, obj := range objs { + o := obj.DeepCopyObject() + if pod, ok := obj.(*corev1.Pod); ok { + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: corev1.DisruptionTarget, + Reason: "DeletionByTaintManager", + Status: corev1.ConditionTrue, + }) + o = pod + } + out = append(out, o) + } + return out +} func TestGrpcRegistryReconciler(t *testing.T) { now := func() metav1.Time { return metav1.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC) } blockOwnerDeletion := true @@ -558,6 +575,18 @@ func TestGrpcRegistryChecker(t *testing.T) { healthy: false, }, }, + { + testName: "Grpc/ExistingRegistry/Image/DeadPod", + in: in{ + cluster: cluster{ + k8sObjs: withPodDeletedButNotRemoved(objectsForCatalogSource(t, validGrpcCatalogSource("test-img", ""))), + }, + catsrc: validGrpcCatalogSource("test-img", ""), + }, + out: out{ + healthy: false, + }, + }, { testName: "Grpc/ExistingRegistry/Image/OldPod/NotHealthy", in: in{