Skip to content

Commit

Permalink
Remove Cluster and Pod finalizers
Browse files Browse the repository at this point in the history
Previous usage of finalizer handlers was unreliable in the case of
flipping Kubernetes Nodes ready status. Local SSD disks that could be
attached to Redpanda Pod prevents rescheduling as the Persistent Volume
affinity bounds Pod to only one Node. In case of Kubernetes Node coming
back to live Cluster controller could already delete Redpanda data (PVC
deletion and Redpanda decommissioning). If particular Redpanda Node would
host single replica partition, then it would be a data lost.

If the majority of Redpanda process would run in unstable Kubernetes
Nodes, then Redpanda operator could break whole cluster by losing Raft
quorum.

Reference

#112
redpanda-data/redpanda#6942
  • Loading branch information
RafalKorepta committed Jun 21, 2024
1 parent f81e11d commit 1d58db3
Showing 1 changed file with 40 additions and 167 deletions.
207 changes: 40 additions & 167 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ const (
SecretAnnotationExternalCAKey = "operator.redpanda.com/external-ca"

NotManaged = "false"

DecommissionOnDeleteAnnotation = "operator.redpanda.com/decommission-on-delete"
)

var (
Expand Down Expand Up @@ -134,25 +132,24 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, fmt.Errorf("unable to retrieve Cluster resource: %w", err)
}

isManaged := isRedpandaClusterManaged(log, &vectorizedCluster) && isRedpandaClusterVersionManaged(log, &vectorizedCluster, r.RestrictToRedpandaVersion)

// if the cluster is being deleted, or is no longer managed by the controller,
// delete the finalizers from the Cluster and its Pods
if !vectorizedCluster.GetDeletionTimestamp().IsZero() || !isManaged {
return r.handleClusterDeletion(ctx, &vectorizedCluster, log)
// Previous usage of finalizer handlers was unreliable in the case of
// flipping Kubernetes Nodes ready status. Local SSD disks that could be
// attached to Redpanda Pod prevents rescheduling as the Persistent Volume
// affinity bounds Pod to only one Node. In case of Kubernetes Node coming
// back to live Cluster controller could already delete Redpanda data (PVC
// deletion and Redpanda decommissioning). If particular Redpanda Node would
// host single replica partition, then it would be a data lost.
//
// If the majority of Redpanda process would run in unstable Kubernetes
// Nodes, then Redpanda operator could break whole cluster by losing Raft
// quorum.
if err := r.removeFinalizers(ctx, &vectorizedCluster, log); err != nil {
return ctrl.Result{}, err
}

// if the cluster isn't being deleted, add a finalizer
if !controllerutil.ContainsFinalizer(&vectorizedCluster, FinalizerKey) {
log.V(logger.DebugLevel).Info("adding finalizer")
controllerutil.AddFinalizer(&vectorizedCluster, FinalizerKey)
if err := r.Update(ctx, &vectorizedCluster); err != nil {
return ctrl.Result{}, fmt.Errorf("unable to set Cluster finalizer: %w", err)
}
}
// set a finalizer on the pods so we can have the data needed to decommission them
if err := r.handlePodFinalizer(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("setting pod finalizer: %w", err)
isManaged := isRedpandaClusterManaged(log, &vectorizedCluster) && isRedpandaClusterVersionManaged(log, &vectorizedCluster, r.RestrictToRedpandaVersion)
if !isManaged {
return ctrl.Result{}, nil
}

ar.bootstrapService()
Expand Down Expand Up @@ -328,144 +325,16 @@ func validateImagePullPolicy(imagePullPolicy corev1.PullPolicy) error {
return nil
}

//nolint:funlen,gocyclo // refactor in the next iteration
func (r *ClusterReconciler) handlePodFinalizer(
ctx context.Context, rp *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources,
) error {
log := l.WithName("handlePodFinalizer")
pods, err := r.podList(ctx, rp)
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
}

var decommissionOnDelete bool
decommissionOnDeleteVal, ok := rp.Annotations[DecommissionOnDeleteAnnotation]
if ok {
decommissionOnDelete, err = strconv.ParseBool(decommissionOnDeleteVal)
if err != nil {
//nolint:goerr113 // not going to use wrapped static error here this time
return fmt.Errorf("value of annotation operator.redpanda.com/decommission-on-delete must be convertable to boolean")
}
}

for i := range pods.Items {
pod := &pods.Items[i]
if pod.DeletionTimestamp.IsZero() {
// if the pod is not being deleted, set the finalizer
if err = r.setPodFinalizer(ctx, pod, log); err != nil {
//nolint:goerr113 // not going to use wrapped static error here this time
return fmt.Errorf(`unable to set the finalizer on pod "%s": %w`, pod.Name, err)
}
continue
}
// if the pod is being deleted
// check the node it's assigned to
node := corev1.Node{}
key := types.NamespacedName{Name: pod.Spec.NodeName}
err := r.Get(ctx, key, &node)
// if the node is not gone
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf(`unable to fetch node "%s": %w`, pod.Spec.NodeName, err)
}
if err == nil {
// nor has a noexecute taint
untainted := true
for _, taint := range node.Spec.Taints {
if (taint.Effect == corev1.TaintEffectNoExecute && taint.Key == corev1.TaintNodeUnreachable) || (decommissionOnDelete && taint.Effect == corev1.TaintEffectNoSchedule) {
untainted = false
}
}
if untainted {
// remove the finalizer and let the pod be restarted
if err = r.removePodFinalizer(ctx, pod, log); err != nil {
return fmt.Errorf(`unable to remove finalizer from pod "%s": %w`, pod.Name, err)
}
continue
}
}
// get the node id
nodeIDStr, ok := pod.GetAnnotations()[resources.PodAnnotationNodeIDKey]
if !ok {
return fmt.Errorf("cannot determine node_id for pod %s: %w. not removing finalizer", pod.Name, err)
}
nodeID, err := strconv.Atoi(nodeIDStr)
if err != nil {
return fmt.Errorf("node-id annotation is not an integer: %w", err)
}

pki, err := ar.getPKI()
if err != nil {
return fmt.Errorf("getting pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}
brokers, err := adminClient.Brokers(ctx)
if err != nil {
return fmt.Errorf("unable to fetch brokers: %w", err)
}
// check if the node in the broker list
var broker *admin.Broker
for i := range brokers {
if brokers[i].NodeID == nodeID {
broker = &brokers[i]
break
}
}
// if it's not gone
if broker != nil {
// decommission it
log.WithValues("node-id", nodeID).Info("decommissioning broker")
if err = adminClient.DecommissionBroker(ctx, nodeID); err != nil {
return fmt.Errorf(`unable to decommission node "%d": %w`, nodeID, err)
}
}

if !r.allowPVCDeletion {
// remove the finalizer
if err = r.removePodFinalizer(ctx, pod, log); err != nil {
return fmt.Errorf(`unable to remove finalizer from pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}
return nil
}
// delete the associated pvc
if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil {
return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}

// remove the finalizer
if err := r.removePodFinalizer(ctx, pod, log); err != nil {
return fmt.Errorf(`unable to remove finalizer from pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}
}
return nil
}

func (r *ClusterReconciler) removePodFinalizer(
ctx context.Context, pod *corev1.Pod, l logr.Logger,
) error {
log := l.WithName("removePodFinalizer")
if controllerutil.ContainsFinalizer(pod, FinalizerKey) {
log.V(logger.DebugLevel).WithValues("namespace", pod.Namespace, "name", pod.Name).Info("removing finalizer")
patch := client.MergeFrom(pod.DeepCopy())
controllerutil.RemoveFinalizer(pod, FinalizerKey)
if err := r.Update(ctx, pod); err != nil {
return err
}
}
return nil
}

func (r *ClusterReconciler) setPodFinalizer(
ctx context.Context, pod *corev1.Pod, l logr.Logger,
) error {
log := l.WithName("setPodFinalizer")
if !controllerutil.ContainsFinalizer(pod, FinalizerKey) {
log.V(logger.DebugLevel).WithValues("namespace", pod.Namespace, "name", pod.Name).Info("adding finalizer")
controllerutil.AddFinalizer(pod, FinalizerKey)
if err := r.Update(ctx, pod); err != nil {
return err
if err := r.Patch(ctx, pod, patch); err != nil {
return fmt.Errorf("unable to remove pod (%s/%s) finalizer: %w", pod.Namespace, pod.Name, err)
}
}
return nil
Expand Down Expand Up @@ -903,28 +772,32 @@ func (r *ClusterReconciler) createExternalNodesList(
return result, nil
}

func (r *ClusterReconciler) handleClusterDeletion(
func (r *ClusterReconciler) removeFinalizers(
ctx context.Context, redpandaCluster *vectorizedv1alpha1.Cluster, l logr.Logger,
) (reconcile.Result, error) {
log := l.WithName("handleClusterDeletion")
log.V(logger.DebugLevel).Info("handling cluster deletion")
) error {
log := l.WithName("removeFinalizers")
log.V(logger.DebugLevel).Info("handling finalizer removal")

if controllerutil.ContainsFinalizer(redpandaCluster, FinalizerKey) {
log.V(logger.DebugLevel).Info("removing finalizers")
pods, err := r.podList(ctx, redpandaCluster)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to list Pods: %w", err)
}
for i := range pods.Items {
if err := r.removePodFinalizer(ctx, &pods.Items[i], log); err != nil {
return ctrl.Result{}, fmt.Errorf(`unable to remove finalizer for pod "%s": %w`, pods.Items[i].GetName(), err)
}
}
log.V(logger.DebugLevel).Info("removing finalizers from cluster custom resource")
patch := client.MergeFrom(redpandaCluster.DeepCopy())
controllerutil.RemoveFinalizer(redpandaCluster, FinalizerKey)
if err := r.Update(ctx, redpandaCluster); err != nil {
return ctrl.Result{}, fmt.Errorf("unable to remove Cluster finalizer: %w", err)
if err := r.Patch(ctx, redpandaCluster, patch); err != nil {
return fmt.Errorf("unable to remove Cluster finalizer: %w", err)
}
}
return ctrl.Result{}, nil

pods, err := r.podList(ctx, redpandaCluster)
if err != nil {
return fmt.Errorf("unable to list Pods: %w", err)
}

for i := range pods.Items {
if err := r.removePodFinalizer(ctx, &pods.Items[i], log); err != nil {
return fmt.Errorf(`unable to remove finalizer for pod "%s": %w`, pods.Items[i].GetName(), err)
}
}
return nil
}

// setInitialSuperUserPassword should be idempotent, create user if not found, updates if found
Expand Down

0 comments on commit 1d58db3

Please sign in to comment.