Skip to content

Commit

Permalink
Merge pull request #9750 from RafalKorepta/rk/improve-ghost-nodes-rem…
Browse files Browse the repository at this point in the history
…oval

k8s: Improve ghost nodes removal
  • Loading branch information
RafalKorepta committed Apr 3, 2023
2 parents 51e577a + f745014 commit f7869db
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(

nodeID, err := r.fetchAdminNodeID(ctx, rp, pod, log)
if err != nil {
return fmt.Errorf("cannot fetch node id for node-id annotation: %w", err)
return fmt.Errorf(`cannot fetch node id for "%s" node-id annotation: %w`, pod.Name, err)
}

realNodeIDStr := fmt.Sprintf("%d", nodeID)
Expand All @@ -537,6 +537,15 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(
continue
}

oldNodeID, err := strconv.Atoi(nodeIDStr)
if err != nil {
return fmt.Errorf("unable to convert node ID (%s) to int: %w", nodeIDStr, err)
}

if err = r.decommissionBroker(ctx, rp, oldNodeID, log); err != nil {
return fmt.Errorf("unable to decommission broker: %w", err)
}

log.WithValues("pod-name", pod.Name, "node-id", nodeID).Info("setting node-id annotation")
pod.Annotations[PodAnnotationNodeIDKey] = realNodeIDStr
if err := r.Update(ctx, pod, &client.UpdateOptions{}); err != nil {
Expand All @@ -546,6 +555,34 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(
return nil
}

func (r *ClusterReconciler) decommissionBroker(
ctx context.Context, rp *redpandav1alpha1.Cluster, nodeID int, log logr.Logger,
) error {
log.V(6).WithValues("node-id", nodeID).Info("decommission broker")

redpandaPorts := networking.NewRedpandaPorts(rp)
headlessPorts := collectHeadlessPorts(redpandaPorts)
clusterPorts := collectClusterPorts(redpandaPorts, rp)
headlessSvc := resources.NewHeadlessService(r.Client, rp, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, rp, r.Scheme, clusterPorts, log)

pki, err := certmanager.NewPki(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return fmt.Errorf("unable to create pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}

err = adminClient.DecommissionBroker(ctx, nodeID)
if err != nil {
return fmt.Errorf("unable to decommission broker: %w", err)
}
return nil
}

func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1alpha1.Cluster, pod *corev1.Pod, log logr.Logger) (int32, error) {
redpandaPorts := networking.NewRedpandaPorts(rp)
headlessPorts := collectHeadlessPorts(redpandaPorts)
Expand Down

0 comments on commit f7869db

Please sign in to comment.