Skip to content

Commit

Permalink
Decommission ghost brokers based on health overview
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalKorepta committed Jun 21, 2024
1 parent 7c8ba1c commit ead39de
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/go/k8s/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ lint: golangci-lint

.PHONY: lint-fix
lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes
$(GOLANGCI_LINT) run --fix
$(GOLANGCI_LINT) run --fix --timeout 30m

.PHONY: install-prometheus
install-prometheus:
Expand Down
70 changes: 50 additions & 20 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -786,7 +787,7 @@ func (r *ClusterReconciler) createExternalNodesList(
}
}

if externalKafkaListener != nil && len(externalKafkaListener.External.Subdomain) > 0 {
if externalKafkaListener != nil && externalKafkaListener.External.Subdomain != "" {
address, err := subdomainAddress(externalKafkaListener.External.EndpointTemplate, &pod, externalKafkaListener.External.Subdomain, getNodePort(&nodePortSvc, resources.ExternalListenerName))
if err != nil {
return nil, err
Expand All @@ -800,7 +801,7 @@ func (r *ClusterReconciler) createExternalNodesList(
))
}

if externalAdminListener != nil && len(externalAdminListener.External.Subdomain) > 0 {
if externalAdminListener != nil && externalAdminListener.External.Subdomain != "" {
address, err := subdomainAddress(externalAdminListener.External.EndpointTemplate, &pod, externalAdminListener.External.Subdomain, getNodePort(&nodePortSvc, resources.AdminPortExternalName))
if err != nil {
return nil, err
Expand All @@ -814,7 +815,7 @@ func (r *ClusterReconciler) createExternalNodesList(
))
}

if externalProxyListener != nil && len(externalProxyListener.External.Subdomain) > 0 {
if externalProxyListener != nil && externalProxyListener.External.Subdomain != "" {
address, err := subdomainAddress(externalProxyListener.External.EndpointTemplate, &pod, externalProxyListener.External.Subdomain, getNodePort(&nodePortSvc, resources.PandaproxyPortExternalName))
if err != nil {
return nil, err
Expand All @@ -837,7 +838,7 @@ func (r *ClusterReconciler) createExternalNodesList(
}
}

if schemaRegistryConf != nil && schemaRegistryConf.External != nil && len(schemaRegistryConf.External.Subdomain) > 0 {
if schemaRegistryConf != nil && schemaRegistryConf.External != nil && schemaRegistryConf.External.Subdomain != "" {
prefix := ""
if schemaRegistryConf.External.Endpoint != "" {
prefix = fmt.Sprintf("%s.", schemaRegistryConf.External.Endpoint)
Expand All @@ -849,7 +850,7 @@ func (r *ClusterReconciler) createExternalNodesList(
)
}

if externalProxyListener != nil && len(externalProxyListener.External.Subdomain) > 0 {
if externalProxyListener != nil && externalProxyListener.External.Subdomain != "" {
result.PandaproxyIngress = &externalProxyListener.External.Subdomain
}

Expand Down Expand Up @@ -968,14 +969,9 @@ func (m *missingBrokerIDError) Error() string {
func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
ctx, done := context.WithCancel(c)
defer done()
log := l.WithName("deleteGhostBrokers")
log := l.WithName("doDecommissionGhostBrokers")
log.V(logger.DebugLevel).Info("deleting ghost brokers")

pods, err := r.podList(ctx, vCluster)
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
}

pki, err := ar.getPKI()
if err != nil {
return fmt.Errorf("getting pki: %w", err)
Expand All @@ -985,35 +981,69 @@ func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vClust
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}
adminBrokers, err := adminClient.Brokers(ctx)

health, err := adminClient.GetHealthOverview(ctx)
if err != nil {
return fmt.Errorf("feating cluster heatlh overview: %w", err)
}

// Until cluster does not report that its healthy do not decommission
if !health.IsHealthy {
err = errors.New("cluster is not healthy") //nolint:err133 // That error will not be handled by reconciler function
log.V(logger.DebugLevel).Error(err, "stopping decommission verification",
"requested-replicas", vCluster.Spec.Replicas,
"nodes-registered", health.AllNodes,
"nodes-down", health.NodesDown)
return err
}

pods, err := r.podList(ctx, vCluster)
if err != nil {
return fmt.Errorf("unable to fetch brokers: %w", err)
return fmt.Errorf("unable to fetch PodList: %w", err)
}

// Create map of Redpanda IDs from Pod resource
actualBrokerIDs := make(map[int]any, len(pods.Items))
for i := range pods.Items {
pod := &pods.Items[i]
if pod.Annotations == nil {
return fmt.Errorf("requeuing: %w", &missingBrokerIDError{})
}

nodeIDStrAnnotation, annotationExist := pod.Annotations[resources.PodAnnotationNodeIDKey]
if !annotationExist {
return fmt.Errorf("requeuing: %w", &missingBrokerIDError{})
}
id, err := strconv.Atoi(nodeIDStrAnnotation)

var id int
id, err = strconv.Atoi(nodeIDStrAnnotation)
if err != nil {
return fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err)
}

actualBrokerIDs[id] = nil
}

// if the admin API shows brokers that are not assigned to existing pods, decommission them
for i := range adminBrokers {
broker := adminBrokers[i]
if _, ok := actualBrokerIDs[broker.NodeID]; ok {
continue
// Reduce brokers to the list of brokers that are not visible at the K8S level
bl, err := adminClient.Brokers(ctx)
if err != nil {
return fmt.Errorf("retrieving broker list: %w", err)
}

log.V(logger.DebugLevel).Info("removes Redpanda which reported its ID (in Pod annotation) against Redpanda broker list",
"broker-list", bl, "actual-broker-ids", actualBrokerIDs)
var nodesConsideredDown []int
for _, b := range bl {
if _, isInK8S := actualBrokerIDs[b.NodeID]; !(ptr.Deref(b.IsAlive, true) && b.MembershipStatus == admin.MembershipStatusActive && isInK8S) {
nodesConsideredDown = append(nodesConsideredDown, b.NodeID)
}
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
}

log.V(logger.DebugLevel).Info("Nodes that found to be down",
"nodes-considered-down", nodesConsideredDown, "health-nodes-down", health.NodesDown)
for _, ndID := range nodesConsideredDown {
l.Info("decommission broker", "node-id", ndID)
if err := adminClient.DecommissionBroker(ctx, ndID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ var _ = Describe("RedPandaCluster controller", func() {
len(rc.Status.Nodes.ExternalAdmin) == 1 &&
len(rc.Status.Nodes.ExternalPandaproxy) == 1 &&
len(rc.Status.Nodes.SchemaRegistry.ExternalNodeIPs) == 1 &&
len(rc.Status.Nodes.SchemaRegistry.Internal) > 0 &&
rc.Status.Nodes.SchemaRegistry.Internal != "" &&
rc.Status.Nodes.SchemaRegistry.External == "" // Without subdomain the external address is empty
}, timeout, interval).Should(BeTrue())
})
Expand Down Expand Up @@ -449,7 +449,7 @@ var _ = Describe("RedPandaCluster controller", func() {
err := k8sClient.Get(context.Background(), key, &cluster)
return err == nil &&
cluster.Status.Nodes.SchemaRegistry != nil &&
len(cluster.Status.Nodes.SchemaRegistry.Internal) > 0 &&
cluster.Status.Nodes.SchemaRegistry.Internal != "" &&
len(cluster.Status.Nodes.Internal) > 0
}, timeout, interval).Should(BeTrue())
})
Expand Down

0 comments on commit ead39de

Please sign in to comment.