Skip to content

Commit

Permalink
Decommission ghost brokers based on health overview
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalKorepta committed Jun 19, 2024
1 parent 4a192f9 commit 9b432c8
Showing 1 changed file with 40 additions and 7 deletions.
47 changes: 40 additions & 7 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,43 @@ func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vClust
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}
adminBrokers, err := adminClient.Brokers(ctx)

health, err := adminClient.GetHealthOverview(ctx)
if err != nil {
return fmt.Errorf("feating cluster heatlh overview: %w", err)
}

if !health.IsHealthy {
err = errors.New("cluster is not healthy")
log.V(logger.DebugLevel).Error(err, "stopping decommission verification",
"requested-replicas", vCluster.Spec.Replicas,
"nodes-registered", health.AllNodes,
"nodes-down", health.NodesDown)
return err
}

ss, err := ar.getStatefulSet()
if err != nil {
return fmt.Errorf("unable to fetch brokers: %w", err)
return err
}

sts := &appsv1.StatefulSet{}
if err := r.Client.Get(ctx, ss.Key(), sts); err != nil {
return fmt.Errorf("could not retrieve the statefulset: %w", err)
}

if vCluster.Spec.Replicas == nil {
return nil
}

if len(health.AllNodes) <= int(*vCluster.Spec.Replicas) {
log.V(logger.DebugLevel).Info("not enough nodes to check decommission", "all-nodes", health.AllNodes, "replicas", *vCluster.Spec.Replicas)
return nil
}

if sts.Status.Replicas != *vCluster.Spec.Replicas || sts.Status.AvailableReplicas != *vCluster.Spec.Replicas {
return fmt.Errorf("statefulset status does not match cluster specification available replicas (%d), replicas (%d), spec replicas (%d)",
sts.Status.AvailableReplicas, sts.Status.Replicas, *vCluster.Spec.Replicas)
}

actualBrokerIDs := make(map[int]any, len(pods.Items))
Expand All @@ -1007,13 +1041,12 @@ func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vClust
actualBrokerIDs[id] = nil
}

// if the admin API shows brokers that are not assigned to existing pods, decommission them
for i := range adminBrokers {
broker := adminBrokers[i]
if _, ok := actualBrokerIDs[broker.NodeID]; ok {
for i := range health.NodesDown {
nID := health.NodesDown[i]
if _, ok := actualBrokerIDs[nID]; ok {
continue
}
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
if err := adminClient.DecommissionBroker(ctx, nID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
}
}
Expand Down

0 comments on commit 9b432c8

Please sign in to comment.