Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not perform Redpanda decommission based on annotation #161

Merged
merged 2 commits into from
Jul 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 84 additions & 77 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -269,12 +270,10 @@ func (r *ClusterReconciler) Reconcile(
RequeueAfter: 4 * time.Second,
}, nil
}

if err := r.setPodNodeIDLabel(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("setting pod node_id label: %w", err)
}
if err := r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ghost brokers: %w", err)
}

// want: refactor above to resources (i.e. setInitialSuperUserPassword, reconcileConfiguration)
// ensuring license must be at the end when condition ClusterConfigured=true and AdminAPI is ready
Expand All @@ -288,6 +287,10 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, err
}

if r.GhostDecommissioning {
r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar)
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -370,21 +373,6 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(
continue
}

var oldNodeID int
if annotationExist {
oldNodeID, err = strconv.Atoi(nodeIDStrAnnotation)
if err != nil {
combinedErrors = errors.Join(combinedErrors, fmt.Errorf("unable to convert node ID (%s) to int: %w", nodeIDStrAnnotation, err))
continue
}

log.WithValues("pod-name", pod.Name, "old-node-id", oldNodeID).Info("decommission old node-id")
if err = r.decommissionBroker(ctx, rp, oldNodeID, log, ar); err != nil {
combinedErrors = errors.Join(combinedErrors, fmt.Errorf("unable to decommission broker: %w", err))
continue
}
}

log.WithValues("pod-name", pod.Name, "new-node-id", nodeID).Info("setting node-id annotation")
pod.Annotations[resources.PodAnnotationNodeIDKey] = realNodeIDStr
if err := r.Update(ctx, pod, &client.UpdateOptions{}); err != nil {
Expand Down Expand Up @@ -432,29 +420,6 @@ func (r *ClusterReconciler) setPodNodeIDLabel(
return nil
}

func (r *ClusterReconciler) decommissionBroker(
ctx context.Context, rp *vectorizedv1alpha1.Cluster, nodeID int, l logr.Logger, ar *attachedResources,
) error {
log := l.WithName("decommissionBroker").WithValues("node-id", nodeID)
log.V(logger.DebugLevel).Info("decommission broker")

pki, err := ar.getPKI()
if err != nil {
return fmt.Errorf("getting pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}

err = adminClient.DecommissionBroker(ctx, nodeID)
if err != nil && !strings.Contains(err.Error(), "failed: Not Found") {
return fmt.Errorf("unable to decommission broker: %w", err)
}
return nil
}

func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *vectorizedv1alpha1.Cluster, pod *corev1.Pod, ar *attachedResources) (int32, error) {
pki, err := ar.getPKI()
if err != nil {
Expand Down Expand Up @@ -859,77 +824,119 @@ func (r *ClusterReconciler) setInitialSuperUserPassword(
return utilerrors.NewAggregate(errs)
}

// decommissionGhostBrokers decommissions brokers that redpanda thinks exists, but aren't assigned to any pods
// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data
// that was on that disk will be unusable.
func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
if r.GhostDecommissioning {
return r.doDecommissionGhostBrokers(c, vCluster, l, ar)
}
return nil
}

// custom error to satisfy err113
type missingBrokerIDError struct{}

func (m *missingBrokerIDError) Error() string {
return "a pod is temporarily missing the broker-id annotation"
}

func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
ctx, done := context.WithCancel(c)
defer done()
log := l.WithName("deleteGhostBrokers")
log.V(logger.DebugLevel).Info("deleting ghost brokers")
// rpBrokerList returns Redpanda view of registered brokers. Health overview of the cluster is just
// an information during debugging.
func (r *ClusterReconciler) rpBrokerList(ctx context.Context, vCluster *vectorizedv1alpha1.Cluster, ar *attachedResources) (adminutils.AdminAPIClient, []admin.Broker, *admin.ClusterHealthOverview, error) {
pki, err := ar.getPKI()
if err != nil {
return nil, nil, nil, fmt.Errorf("getting pki: %w", err)
}

pods, err := r.podList(ctx, vCluster)
adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
return nil, nil, nil, fmt.Errorf("creating admin client: %w", err)
}

pki, err := ar.getPKI()
bl, err := adminClient.Brokers(ctx)
if err != nil {
return fmt.Errorf("getting pki: %w", err)
return nil, nil, nil, fmt.Errorf("retrieving broker list: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
// Regardless of the problem with getting cluster health overview return broker list
health, _ := adminClient.GetHealthOverview(ctx)

return adminClient, bl, &health, nil
}

// decommissionGhostBrokers will calculate the difference between Broker IDs reported
// in Pod Annotations map and the broker list returned from Redpanda Admin API. Any
// Redpanda reported as not alive would be decommissioned.
// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data
// that was on that disk will be unusable.
// This function will not requeue Cluster reconciliation loop.
func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) {
ctx, done := context.WithCancel(c)
defer done()
log := l.WithName("doDecommissionGhostBrokers")
log.Info("deleting ghost brokers")

adminClient, bl, health, err := r.rpBrokerList(ctx, vCluster, ar)
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
log.Error(err, "stopping decommission verification due to missing broker list")
return
}
adminBrokers, err := adminClient.Brokers(ctx)

pods, err := r.podList(ctx, vCluster)
if err != nil {
return fmt.Errorf("unable to fetch brokers: %w", err)
log.Error(err, "unable to fetch PodList")
return
}

if int32(len(pods.Items)) != ptr.Deref(vCluster.Spec.Replicas, 0) {
log.Info("can not calculate which broker should be decommissioned as not all Pods are running",
"replica-number", ptr.Deref(vCluster.Spec.Replicas, 0),
"pod-len", len(pods.Items))
}

// Create map of existing Redpanda IDs from Redpanda Pod Annotations.
actualBrokerIDs := make(map[int]any, len(pods.Items))
for i := range pods.Items {
pod := &pods.Items[i]
if pod.Annotations == nil {
return fmt.Errorf("requeuing: %w", &missingBrokerIDError{})
log.Error(&missingBrokerIDError{}, "missing annotations in pod", "pod-name", pod.Name)
}

nodeIDStrAnnotation, annotationExist := pod.Annotations[resources.PodAnnotationNodeIDKey]
if !annotationExist {
return fmt.Errorf("requeuing: %w", &missingBrokerIDError{})
log.Error(&missingBrokerIDError{}, "annotations does not have broker id annotation", "pod-name", pod.Name)
return
}
id, err := strconv.Atoi(nodeIDStrAnnotation)

var id int
id, err = strconv.Atoi(nodeIDStrAnnotation)
if err != nil {
return fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err)
log.Error(fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err),
"skipping ghost broker checks due to invalid broker ID in annotation",
"current-broker-ids-map", actualBrokerIDs,
"pod-name", pod.Name, "pod-annotation", nodeIDStrAnnotation)
return
}

actualBrokerIDs[id] = nil
}

// if the admin API shows brokers that are not assigned to existing pods, decommission them
for i := range adminBrokers {
broker := adminBrokers[i]
if _, ok := actualBrokerIDs[broker.NodeID]; ok {
continue
}
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
var nodesConsideredDown []int
for _, b := range bl {
_, isInK8S := actualBrokerIDs[b.NodeID]
if !isInK8S && b.MembershipStatus == admin.MembershipStatusActive && b.IsAlive != nil && !*b.IsAlive {
nodesConsideredDown = append(nodesConsideredDown, b.NodeID)
}
}

return nil
log.Info("Nodes that are reported by Redpanda, but are not running in Kubernetes cluster",
"nodes-considered-down", nodesConsideredDown,
"cluster-health", health,
"broker-list", bl,
"actual-broker-ids", actualBrokerIDs)
for _, ndID := range nodesConsideredDown {
l.Info("decommissioning ghost broker", "node-id", ndID)
if err = adminClient.DecommissionBroker(ctx, ndID); err != nil {
log.Error(err, "failed to decommission ghost broker",
"node-id", ndID,
"nodes-considered-down", nodesConsideredDown,
"cluster-health", health,
"broker-list", bl,
"actual-broker-ids", actualBrokerIDs)
continue
}
}
}

// createUserOnAdminAPI will return to requeue only when api error occurred
Expand Down