diff --git a/src/go/k8s/Makefile b/src/go/k8s/Makefile index 52c47f77..46314854 100644 --- a/src/go/k8s/Makefile +++ b/src/go/k8s/Makefile @@ -230,7 +230,7 @@ lint: golangci-lint .PHONY: lint-fix lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes - $(GOLANGCI_LINT) run --fix + $(GOLANGCI_LINT) run --fix --timeout 30m .PHONY: install-prometheus install-prometheus: diff --git a/src/go/k8s/internal/controller/redpanda/cluster_controller.go b/src/go/k8s/internal/controller/redpanda/cluster_controller.go index 4b3e46df..e38c1ebe 100644 --- a/src/go/k8s/internal/controller/redpanda/cluster_controller.go +++ b/src/go/k8s/internal/controller/redpanda/cluster_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -269,12 +270,10 @@ func (r *ClusterReconciler) Reconcile( RequeueAfter: 4 * time.Second, }, nil } + if err := r.setPodNodeIDLabel(ctx, &vectorizedCluster, log, ar); err != nil { return ctrl.Result{}, fmt.Errorf("setting pod node_id label: %w", err) } - if err := r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar); err != nil { - return ctrl.Result{}, fmt.Errorf("deleting ghost brokers: %w", err) - } // want: refactor above to resources (i.e. setInitialSuperUserPassword, reconcileConfiguration) // ensuring license must be at the end when condition ClusterConfigured=true and AdminAPI is ready @@ -288,6 +287,10 @@ func (r *ClusterReconciler) Reconcile( return ctrl.Result{}, err } + if r.GhostDecommissioning { + r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar) + } + return ctrl.Result{}, nil } @@ -821,16 +824,6 @@ func (r *ClusterReconciler) setInitialSuperUserPassword( return utilerrors.NewAggregate(errs) } -// decommissionGhostBrokers decommissions brokers that redpanda thinks exists, but aren't assigned to any pods -// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data -// that was on that disk will be unusable. -func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error { - if r.GhostDecommissioning { - return r.doDecommissionGhostBrokers(c, vCluster, l, ar) - } - return nil -} - // custom error to satisfy err113 type missingBrokerIDError struct{} @@ -838,60 +831,112 @@ func (m *missingBrokerIDError) Error() string { return "a pod is temporarily missing the broker-id annotation" } -func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error { - ctx, done := context.WithCancel(c) - defer done() - log := l.WithName("deleteGhostBrokers") - log.V(logger.DebugLevel).Info("deleting ghost brokers") +// rpBrokerList returns Redpanda view of registered brokers. Health overview of the cluster is just +// an information during debugging. +func (r *ClusterReconciler) rpBrokerList(ctx context.Context, vCluster *vectorizedv1alpha1.Cluster, ar *attachedResources) (adminutils.AdminAPIClient, []admin.Broker, *admin.ClusterHealthOverview, error) { + pki, err := ar.getPKI() + if err != nil { + return nil, nil, nil, fmt.Errorf("getting pki: %w", err) + } - pods, err := r.podList(ctx, vCluster) + adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider()) if err != nil { - return fmt.Errorf("unable to fetch PodList: %w", err) + return nil, nil, nil, fmt.Errorf("creating admin client: %w", err) } - pki, err := ar.getPKI() + bl, err := adminClient.Brokers(ctx) if err != nil { - return fmt.Errorf("getting pki: %w", err) + return nil, nil, nil, fmt.Errorf("retrieving broker list: %w", err) } - adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider()) + // Regardless of the problem with getting cluster health overview return broker list + health, _ := adminClient.GetHealthOverview(ctx) + + return adminClient, bl, &health, nil +} + +// decommissionGhostBrokers will calculate the difference between Broker IDs reported +// in Pod Annotations map and the broker list returned from Redpanda Admin API. Any +// Redpanda reported as not alive would be decommissioned. +// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data +// that was on that disk will be unusable. +// This function will not requeue Cluster reconciliation loop. +func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) { + ctx, done := context.WithCancel(c) + defer done() + log := l.WithName("doDecommissionGhostBrokers") + log.Info("deleting ghost brokers") + + adminClient, bl, health, err := r.rpBrokerList(ctx, vCluster, ar) if err != nil { - return fmt.Errorf("unable to create admin client: %w", err) + log.Error(err, "stopping decommission verification due to missing broker list") + return } - adminBrokers, err := adminClient.Brokers(ctx) + + pods, err := r.podList(ctx, vCluster) if err != nil { - return fmt.Errorf("unable to fetch brokers: %w", err) + log.Error(err, "unable to fetch PodList") + return } + if int32(len(pods.Items)) != ptr.Deref(vCluster.Spec.Replicas, 0) { + log.Info("can not calculate which broker should be decommissioned as not all Pods are running", + "replica-number", ptr.Deref(vCluster.Spec.Replicas, 0), + "pod-len", len(pods.Items)) + } + + // Create map of existing Redpanda IDs from Redpanda Pod Annotations. actualBrokerIDs := make(map[int]any, len(pods.Items)) for i := range pods.Items { pod := &pods.Items[i] if pod.Annotations == nil { - return fmt.Errorf("requeuing: %w", &missingBrokerIDError{}) + log.Error(&missingBrokerIDError{}, "missing annotations in pod", "pod-name", pod.Name) } + nodeIDStrAnnotation, annotationExist := pod.Annotations[resources.PodAnnotationNodeIDKey] if !annotationExist { - return fmt.Errorf("requeuing: %w", &missingBrokerIDError{}) + log.Error(&missingBrokerIDError{}, "annotations does not have broker id annotation", "pod-name", pod.Name) + return } - id, err := strconv.Atoi(nodeIDStrAnnotation) + + var id int + id, err = strconv.Atoi(nodeIDStrAnnotation) if err != nil { - return fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err) + log.Error(fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err), + "skipping ghost broker checks due to invalid broker ID in annotation", + "current-broker-ids-map", actualBrokerIDs, + "pod-name", pod.Name, "pod-annotation", nodeIDStrAnnotation) + return } + actualBrokerIDs[id] = nil } - // if the admin API shows brokers that are not assigned to existing pods, decommission them - for i := range adminBrokers { - broker := adminBrokers[i] - if _, ok := actualBrokerIDs[broker.NodeID]; ok { - continue - } - if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil { - return fmt.Errorf("failed to decommission ghost broker: %w", err) + var nodesConsideredDown []int + for _, b := range bl { + _, isInK8S := actualBrokerIDs[b.NodeID] + if !isInK8S && b.MembershipStatus == admin.MembershipStatusActive && b.IsAlive != nil && !*b.IsAlive { + nodesConsideredDown = append(nodesConsideredDown, b.NodeID) } } - return nil + log.Info("Nodes that are reported by Redpanda, but are not running in Kubernetes cluster", + "nodes-considered-down", nodesConsideredDown, + "cluster-health", health, + "broker-list", bl, + "actual-broker-ids", actualBrokerIDs) + for _, ndID := range nodesConsideredDown { + l.Info("decommissioning ghost broker", "node-id", ndID) + if err = adminClient.DecommissionBroker(ctx, ndID); err != nil { + log.Error(err, "failed to decommission ghost broker", + "node-id", ndID, + "nodes-considered-down", nodesConsideredDown, + "cluster-health", health, + "broker-list", bl, + "actual-broker-ids", actualBrokerIDs) + continue + } + } } // createUserOnAdminAPI will return to requeue only when api error occurred