Skip to content

Commit

Permalink
operator: Add the option to immediately decommission ghosts
Browse files Browse the repository at this point in the history
When a broker's storage is deleted, it comes back with a new broker id.
There's no indication as to why the data is gone or whether it can be
recovered. Since the Cluster resource is deprecated and the cloud team
would just like to have a method to delete it regardless, the consensus
is that we should assume it's not recoverable and decommission the old
broker id immediately.

This PR adds a hidden flag, `unsafe-decommission-failed-brokers` that,
if set to true, will offer this behavior.

This is dangerous and shouldn't be used. There are circumstances in which
brokers with valid data can be decommissioned and even the potential for
all the brokers to be decommissioned.
  • Loading branch information
joejulian committed Sep 6, 2023
1 parent 564db32 commit fabca06
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/go/k8s/config/e2e-tests/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spec:
- "--allow-pvc-deletion=true"
- "--superusers-prefix=__redpanda_system__"
- "--log-level=trace"
- "--unsafe-decommission-failed-brokers=true"
livenessProbe:
timeoutSeconds: 10
readinessProbe:
Expand Down
77 changes: 77 additions & 0 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ClusterReconciler struct {
MetricsTimeout time.Duration
RestrictToRedpandaVersion string
allowPVCDeletion bool
GhostDecommissioning bool
}

//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -269,6 +270,9 @@ func (r *ClusterReconciler) Reconcile(
if err := r.setPodNodeIDLabel(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("setting pod node_id label: %w", err)
}
if err := r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ghost brokers: %w", err)
}

// want: refactor above to resources (i.e. setInitialSuperUserPassword, reconcileConfiguration)
// ensuring license must be at the end when condition ClusterConfigured=true and AdminAPI is ready
Expand Down Expand Up @@ -961,6 +965,79 @@ func (r *ClusterReconciler) setInitialSuperUserPassword(
return utilerrors.NewAggregate(errs)
}

// decommissionGhostBrokers decommissions brokers that redpanda thinks exists, but aren't assigned to any pods
// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data
// that was on that disk will be unusable.
func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
if r.GhostDecommissioning {
return r.doDecommissionGhostBrokers(c, vCluster, l, ar)
}
return nil
}

// custom error to satisfy err113
type missingBrokerID struct{}

func (m *missingBrokerID) Error() string {
return "a pod is temporarily missing the broker-id annotation"
}

func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error {
ctx, done := context.WithCancel(c)
defer done()
log := l.WithName("deleteGhostBrokers")
log.V(logger.DebugLevel).Info("deleting ghost brokers")

pods, err := r.podList(ctx, vCluster)
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
}

pki, err := ar.getPKI()
if err != nil {
return fmt.Errorf("getting pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}
adminBrokers, err := adminClient.Brokers(ctx)
if err != nil {
return fmt.Errorf("unable to fetch brokers: %w", err)
}

actualBrokerIDs := make(map[string]any, len(pods.Items))
for i := range pods.Items {
pod := &pods.Items[i]
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
nodeIDStrAnnotation, annotationExist := pod.Annotations[resources.PodAnnotationNodeIDKey]
if !annotationExist {
return fmt.Errorf("requeuing: %w", &missingBrokerID{})
}
_, err := strconv.Atoi(nodeIDStrAnnotation)
if err != nil {
return fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err)
}
actualBrokerIDs[nodeIDStrAnnotation] = nil
}

// if the admin API shows brokers that are not assigned to existing pods, decommission them
for i := range adminBrokers {
broker := adminBrokers[i]
if _, ok := actualBrokerIDs[strconv.Itoa(broker.NodeID)]; ok {
continue
}
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
}
}

return nil
}

// createUserOnAdminAPI will return to requeue only when api error occurred
func createUserOnAdminAPI(ctx context.Context, adminAPI adminutils.AdminAPIClient, secret *corev1.Secret) error {
username := string(secret.Data[corev1.BasicAuthUsernameKey])
Expand Down
4 changes: 4 additions & 0 deletions src/go/k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func main() {
// storage driver.
allowPVCDeletion bool
debug bool
ghostbuster bool
)

flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
Expand All @@ -132,6 +133,8 @@ func main() {
flag.StringVar(&vectorizedv1alpha1.SuperUsersPrefix, "superusers-prefix", "", "Prefix to add in username of superusers managed by operator. This will only affect new clusters, enabling this will not add prefix to existing clusters (alpha feature)")
flag.BoolVar(&debug, "debug", false, "Set to enable debugging")
flag.StringVar(&namespace, "namespace", "", "If namespace is set to not empty value, it changes scope of Redpanda operator to work in single namespace")
flag.BoolVar(&ghostbuster, "unsafe-decommission-failed-brokers", false, "Set to enable decommissioning a failed broker that is configured but does not exist in the StatefulSet (ghost broker). This may result in invalidating valid data")
_ = flag.CommandLine.MarkHidden("unsafe-decommission-failed-brokers")

logOptions.BindFlags(flag.CommandLine)
clientOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -290,6 +293,7 @@ func main() {
DecommissionWaitInterval: decommissionWaitInterval,
MetricsTimeout: metricsTimeout,
RestrictToRedpandaVersion: restrictToRedpandaVersion,
GhostDecommissioning: ghostbuster,
}).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).WithAllowPVCDeletion(allowPVCDeletion).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down

0 comments on commit fabca06

Please sign in to comment.