Skip to content

Commit

Permalink
(fix) registry pods do not come up again after node failure (#3366)
Browse files Browse the repository at this point in the history
[PR 3201](#3201) attempted to solve for the issue by deleting the pods stuck in
`Terminating` due to unreachable node. However, the logic to do that was
included in `EnsureRegistryServer`, which only gets executed if polling in
requested by the user.

This PR moves the logic of checking for dead pods out of `EnsureRegistryServer`,
and puts it in `CheckRegistryServer` instead. This way, if there are any dead pods
detected during `CheckRegistryServer`, the value of `healthy` is returned `false`,
which inturn triggers `EnsureRegistryServer`.
  • Loading branch information
anik120 authored Aug 30, 2024
1 parent cd1364f commit f243189
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 36 deletions.
50 changes: 40 additions & 10 deletions pkg/controller/registry/reconciler/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package reconciler

import (
"context"
"errors"
"fmt"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
hashutil "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/hash"
"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
Expand Down Expand Up @@ -327,27 +329,27 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry,

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensureServiceAccount(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
}
if err := c.ensureRole(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring role: %s", source.roleName())
return pkgerrors.Wrapf(err, "error ensuring role: %s", source.roleName())
}
if err := c.ensureRoleBinding(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
return pkgerrors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
}
pod, err := source.Pod(image, defaultPodSecurityConfig)
if err != nil {
return err
}
if err := c.ensurePod(source, defaultPodSecurityConfig, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
}
service, err := source.Service()
if err != nil {
return err
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", service.GetName())
return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName())
}

if overwritePod {
Expand Down Expand Up @@ -420,15 +422,15 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec
}
for _, p := range currentPods {
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting old pod: %s", p.GetName())
return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName())
}
}
}
_, err = c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
if err == nil {
return nil
}
return errors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
return pkgerrors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
}

func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourceDecorator, overwrite bool) error {
Expand Down Expand Up @@ -512,6 +514,34 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(logger *logrus.Entry,
return
}

healthy = true
return
podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, pods, source.GetNamespace())
if e != nil {
return false, fmt.Errorf("error deleting dead pods: %v", e)
}
return podsAreLive, nil
}

// detectAndDeleteDeadPods determines if there are registry client pods that are in the deleted state
// but have not been removed by GC (eg the node goes down before GC can remove them), and attempts to
// force delete the pods. If there are live registry pods remaining, it returns true, otherwise returns false.
func detectAndDeleteDeadPods(logger *logrus.Entry, client operatorclient.ClientInterface, pods []*corev1.Pod, sourceNamespace string) (bool, error) {
var forceDeletionErrs []error
livePodFound := false
for _, pod := range pods {
if !isPodDead(pod) {
livePodFound = true
logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Debug("pod is alive")
continue
}
logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Info("force deleting dead pod")
if err := client.KubernetesInterface().CoreV1().Pods(sourceNamespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
}); err != nil && !apierrors.IsNotFound(err) {
forceDeletionErrs = append(forceDeletionErrs, err)
}
}
if len(forceDeletionErrs) > 0 {
return false, errors.Join(forceDeletionErrs...)
}
return livePodFound, nil
}
52 changes: 52 additions & 0 deletions pkg/controller/registry/reconciler/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,55 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
})
}
}

func TestConfigMapRegistryChecker(t *testing.T) {
validConfigMap := validConfigMap()
validCatalogSource := validConfigMapCatalogSource(validConfigMap)
type cluster struct {
k8sObjs []runtime.Object
}
type in struct {
cluster cluster
catsrc *v1alpha1.CatalogSource
}
type out struct {
healthy bool
err error
}
tests := []struct {
testName string
in in
out out
}{
{
testName: "ConfigMap/ExistingRegistry/DeadPod",
in: in{
cluster: cluster{
k8sObjs: append(withPodDeletedButNotRemoved(objectsForCatalogSource(t, validCatalogSource)), validConfigMap),
},
catsrc: validCatalogSource,
},
out: out{
healthy: false,
},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
stopc := make(chan struct{})
defer close(stopc)

factory, _ := fakeReconcilerFactory(t, stopc, withK8sObjs(tt.in.cluster.k8sObjs...))
rec := factory.ReconcilerForSource(tt.in.catsrc)

healthy, err := rec.CheckRegistryServer(logrus.NewEntry(logrus.New()), tt.in.catsrc)

require.Equal(t, tt.out.err, err)
if tt.out.err != nil {
return
}

require.Equal(t, tt.out.healthy, healthy)
})
}
}
33 changes: 7 additions & 26 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package reconciler

import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"

Expand All @@ -24,7 +22,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
)

const (
Expand Down Expand Up @@ -348,25 +345,6 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err
func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig, overwrite bool) error {
// currentPods refers to the current pod instances of the catalog source
currentPods := c.currentPods(logger, source)

var forceDeleteErrs []error
currentPods = slices.DeleteFunc(currentPods, func(pod *corev1.Pod) bool {
if !isPodDead(pod) {
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Debug("pod is alive")
return false
}
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Info("force deleting dead pod")
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
}); err != nil && !apierrors.IsNotFound(err) {
forceDeleteErrs = append(forceDeleteErrs, pkgerrors.Wrapf(err, "error deleting old pod: %s", pod.GetName()))
}
return true
})
if len(forceDeleteErrs) > 0 {
return errors.Join(forceDeleteErrs...)
}

if len(currentPods) > 0 {
if !overwrite {
return nil
Expand Down Expand Up @@ -628,16 +606,19 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal
if err != nil {
return false, err
}
current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
if err != nil {
return false, err
}
if len(current) < 1 ||
if len(currentPods) < 1 ||
service == nil || c.currentServiceAccount(source) == nil {
return false, nil
}

return true, nil
podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace())
if e != nil {
return false, fmt.Errorf("error deleting dead pods: %v", e)
}
return podsAreLive, nil
}

// promoteCatalog swaps the labels on the update pod so that the update pod is now reachable by the catalog service.
Expand Down
29 changes: 29 additions & 0 deletions pkg/controller/registry/reconciler/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ func grpcCatalogSourceWithName(name string) *v1alpha1.CatalogSource {
return catsrc
}

func withPodDeletedButNotRemoved(objs []runtime.Object) []runtime.Object {
var out []runtime.Object
for _, obj := range objs {
o := obj.DeepCopyObject()
if pod, ok := obj.(*corev1.Pod); ok {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
Type: corev1.DisruptionTarget,
Reason: "DeletionByTaintManager",
Status: corev1.ConditionTrue,
})
o = pod
}
out = append(out, o)
}
return out
}
func TestGrpcRegistryReconciler(t *testing.T) {
now := func() metav1.Time { return metav1.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC) }
blockOwnerDeletion := true
Expand Down Expand Up @@ -558,6 +575,18 @@ func TestGrpcRegistryChecker(t *testing.T) {
healthy: false,
},
},
{
testName: "Grpc/ExistingRegistry/Image/DeadPod",
in: in{
cluster: cluster{
k8sObjs: withPodDeletedButNotRemoved(objectsForCatalogSource(t, validGrpcCatalogSource("test-img", ""))),
},
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
healthy: false,
},
},
{
testName: "Grpc/ExistingRegistry/Image/OldPod/NotHealthy",
in: in{
Expand Down

0 comments on commit f243189

Please sign in to comment.