Skip to content

Commit

Permalink
Decommission brokers when Redpanda reports additional not alive nodes
Browse files Browse the repository at this point in the history
By reducing broker list to only the one that are not running as Pod and
are reported as not alive.
  • Loading branch information
RafalKorepta committed Jul 2, 2024
1 parent 1d343d9 commit 9fa7a78
Showing 1 changed file with 84 additions and 39 deletions.
123 changes: 84 additions & 39 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -269,12 +270,10 @@ func (r *ClusterReconciler) Reconcile(
RequeueAfter: 4 * time.Second,
}, nil
}

if err := r.setPodNodeIDLabel(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("setting pod node_id label: %w", err)
}
if err := r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ghost brokers: %w", err)
}

// want: refactor above to resources (i.e. setInitialSuperUserPassword, reconcileConfiguration)
// ensuring license must be at the end when condition ClusterConfigured=true and AdminAPI is ready
Expand All @@ -288,6 +287,10 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, err
}

if r.GhostDecommissioning {
r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar)
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -821,77 +824,119 @@ func (r *ClusterReconciler) setInitialSuperUserPassword(
return utilerrors.NewAggregate(errs)
}

// decommissionGhostBrokers decommissions brokers that redpanda thinks exists, but aren't assigned to any pods
// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data
// that was on that disk will be unusable.
func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
if r.GhostDecommissioning {
return r.doDecommissionGhostBrokers(c, vCluster, l, ar)
}
return nil
}

// custom error to satisfy err113
type missingBrokerIDError struct{}

func (m *missingBrokerIDError) Error() string {
return "a pod is temporarily missing the broker-id annotation"
}

func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
ctx, done := context.WithCancel(c)
defer done()
log := l.WithName("deleteGhostBrokers")
log.V(logger.DebugLevel).Info("deleting ghost brokers")
// rpBrokerList returns Redpanda view of registered brokers. Health overview of the cluster is just
// an information during debugging.
func (r *ClusterReconciler) rpBrokerList(ctx context.Context, vCluster *vectorizedv1alpha1.Cluster, ar *attachedResources) (adminutils.AdminAPIClient, []admin.Broker, *admin.ClusterHealthOverview, error) {
pki, err := ar.getPKI()
if err != nil {
return nil, nil, nil, fmt.Errorf("getting pki: %w", err)
}

pods, err := r.podList(ctx, vCluster)
adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
return nil, nil, nil, fmt.Errorf("creating admin client: %w", err)
}

pki, err := ar.getPKI()
bl, err := adminClient.Brokers(ctx)
if err != nil {
return fmt.Errorf("getting pki: %w", err)
return nil, nil, nil, fmt.Errorf("retrieving broker list: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
// Regardless of the problem with getting cluster health overview return broker list
health, _ := adminClient.GetHealthOverview(ctx)

return adminClient, bl, &health, nil
}

// decommissionGhostBrokers will calculate the difference between Broker IDs reported
// in Pod Annotations map and the broker list returned from Redpanda Admin API. Any
// Redpanda reported as not alive would be decommissioned.
// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data
// that was on that disk will be unusable.
// This function will not requeue Cluster reconciliation loop.
func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) {
ctx, done := context.WithCancel(c)
defer done()
log := l.WithName("doDecommissionGhostBrokers")
log.Info("deleting ghost brokers")

adminClient, bl, health, err := r.rpBrokerList(ctx, vCluster, ar)
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
log.Error(err, "stopping decommission verification due to missing broker list")
return
}
adminBrokers, err := adminClient.Brokers(ctx)

pods, err := r.podList(ctx, vCluster)
if err != nil {
return fmt.Errorf("unable to fetch brokers: %w", err)
log.Error(err, "unable to fetch PodList")
return
}

if int32(len(pods.Items)) != ptr.Deref(vCluster.Spec.Replicas, 0) {
log.Info("can not calculate which broker should be decommissioned as not all Pods are running",
"replica-number", ptr.Deref(vCluster.Spec.Replicas, 0),
"pod-len", len(pods.Items))
}

// Create map of existing Redpanda IDs from Redpanda Pod Annotations.
actualBrokerIDs := make(map[int]any, len(pods.Items))
for i := range pods.Items {
pod := &pods.Items[i]
if pod.Annotations == nil {
return fmt.Errorf("requeuing: %w", &missingBrokerIDError{})
log.Error(&missingBrokerIDError{}, "missing annotations in pod", "pod-name", pod.Name)
}

nodeIDStrAnnotation, annotationExist := pod.Annotations[resources.PodAnnotationNodeIDKey]
if !annotationExist {
return fmt.Errorf("requeuing: %w", &missingBrokerIDError{})
log.Error(&missingBrokerIDError{}, "annotations does not have broker id annotation", "pod-name", pod.Name)
return
}
id, err := strconv.Atoi(nodeIDStrAnnotation)

var id int
id, err = strconv.Atoi(nodeIDStrAnnotation)
if err != nil {
return fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err)
log.Error(fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err),
"skipping ghost broker checks due to invalid broker ID in annotation",
"current-broker-ids-map", actualBrokerIDs,
"pod-name", pod.Name, "pod-annotation", nodeIDStrAnnotation)
return
}

actualBrokerIDs[id] = nil
}

// if the admin API shows brokers that are not assigned to existing pods, decommission them
for i := range adminBrokers {
broker := adminBrokers[i]
if _, ok := actualBrokerIDs[broker.NodeID]; ok {
continue
}
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
var nodesConsideredDown []int
for _, b := range bl {
_, isInK8S := actualBrokerIDs[b.NodeID]
if !isInK8S && b.MembershipStatus == admin.MembershipStatusActive && b.IsAlive != nil && !*b.IsAlive {
nodesConsideredDown = append(nodesConsideredDown, b.NodeID)
}
}

return nil
log.Info("Nodes that are reported by Redpanda, but are not running in Kubernetes cluster",
"nodes-considered-down", nodesConsideredDown,
"cluster-health", health,
"broker-list", bl,
"actual-broker-ids", actualBrokerIDs)
for _, ndID := range nodesConsideredDown {
l.Info("decommissioning ghost broker", "node-id", ndID)
if err = adminClient.DecommissionBroker(ctx, ndID); err != nil {
log.Error(err, "failed to decommission ghost broker",
"node-id", ndID,
"nodes-considered-down", nodesConsideredDown,
"cluster-health", health,
"broker-list", bl,
"actual-broker-ids", actualBrokerIDs)
continue
}
}
}

// createUserOnAdminAPI will return to requeue only when api error occurred
Expand Down

0 comments on commit 9fa7a78

Please sign in to comment.