Skip to content

Commit

Permalink
scaling down by removing a nodepool
Browse files Browse the repository at this point in the history
  • Loading branch information
koikonom committed Jul 5, 2024
1 parent ca5818a commit eabf138
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 98 deletions.
29 changes: 24 additions & 5 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type NodePoolSpec struct {
// assigned to these containers will be required on the cluster on top of
// the resources defined here
Resources RedpandaResourceRequirements `json:"resources"`
// Removed is set internally to signal the reconciliation loop that this nodepool
// has been removed from spec
Removed bool `json:"removed,omitempty"`
}

// RestartConfig contains strategies to configure how the cluster behaves when restarting, because of upgrades
Expand Down Expand Up @@ -433,6 +436,9 @@ type ClusterStatus struct {
// Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number
// +optional
DecommissioningNode *int32 `json:"decommissioningNode,omitempty"`
// Indicates the pod that hosts the node we are currently decommissioning
// +optional
DecommissioningPod string `json:"decommissioningPod,omitempty"`
// Current version of the cluster.
// +optional
Version string `json:"version"`
Expand Down Expand Up @@ -687,9 +693,8 @@ type LoadBalancerStatus struct {
}

type NodePoolStatus struct {
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
Pods []string `json:"pods,omitempty"`
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
}

// KafkaAPITLS configures TLS for redpanda Kafka API
Expand Down Expand Up @@ -1191,11 +1196,17 @@ func (r *Cluster) GetCurrentReplicas() int32 {
if r == nil {
return 0
}
if r.Status.CurrentReplicas <= 0 {
var currentReplicas int32

for _, np := range r.Status.NodePools {
currentReplicas += np.CurrentReplicas
}

if currentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
return currentReplicas
}

func (r *Cluster) GetReplicas() int32 {
Expand Down Expand Up @@ -1419,3 +1430,11 @@ func (r *Cluster) GetDecommissionBrokerID() *int32 {
func (r *Cluster) SetDecommissionBrokerID(id *int32) {
r.Status.DecommissioningNode = id
}

func (r *Cluster) GetDecomissionningPod() string {
return r.Status.DecommissioningPod
}

func (r *Cluster) SetDecommissioningPod(pod string) {
r.Status.DecommissioningPod = pod
}
14 changes: 12 additions & 2 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func TestInitialReplicas(t *testing.T) {
// backward compatibility. Remove when v22.2 is no longer supported.
cluster := v1alpha1.Cluster{}
cluster.Spec.Version = featuregates.V22_2_1.String()
cluster.Spec.Replicas = ptr.To(int32(3))
cluster.Spec.NodePools = []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
}
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
Expand All @@ -239,7 +244,12 @@ func TestInitialReplicas(t *testing.T) {

// test with latest version
cluster = v1alpha1.Cluster{}
cluster.Spec.Replicas = ptr.To(int32(3))
cluster.Spec.NodePools = []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
Expand Down
8 changes: 7 additions & 1 deletion src/go/k8s/api/vectorized/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestNilReplicasIsNotAllowed(t *testing.T) {
rpCluster := validRedpandaCluster()
_, err := rpCluster.ValidateCreate()
require.Nil(t, err, "Initial cluster is not valid")
rpCluster.Spec.Replicas = nil
rpCluster.Spec.NodePools = nil
_, err = rpCluster.ValidateCreate()
assert.Error(t, err)
}
Expand Down Expand Up @@ -1960,6 +1960,12 @@ func validRedpandaCluster() *v1alpha1.Cluster {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
NodePools: []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
},
Replicas: ptr.To(int32(1)),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124, AuthenticationMethod: "none"}},
Expand Down
7 changes: 1 addition & 6 deletions src/go/k8s/api/vectorized/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,11 @@ spec:
If specified, Redpanda Pod node selectors. For reference please visit
https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node
type: object
removed:
description: |-
Removed is set internally to signal the reconciliation loop that this nodepool
has been removed from spec
type: boolean
replicas:
description: Replicas determine how big the node pool will be.
format: int32
Expand Down Expand Up @@ -1522,6 +1527,10 @@ spec:
from the cluster and provides its ordinal number
format: int32
type: integer
decommissioningPod:
description: Indicates the pod that hosts the node we are currently
decommissioning
type: string
nodePools:
additionalProperties:
properties:
Expand Down
20 changes: 3 additions & 17 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, fmt.Errorf("creating statefulset: %w", err)
}

if vectorizedCluster.Status.CurrentReplicas >= 1 {
// KO: CR
if vectorizedCluster.GetCurrentReplicas() >= 1 {
if err = r.setPodNodeIDAnnotation(ctx, &vectorizedCluster, log, ar); err != nil {
log.Error(err, "setting pod node_id annotation")
}
Expand Down Expand Up @@ -228,12 +229,6 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, err
}

npStatus := make(map[string]string)
for _, sts := range stSets {
rep := strconv.FormatInt(int64(sts.GetReplicas()), 10)
npStatus[sts.Key().Name] = rep
}

err = r.reportStatus(
ctx,
&vectorizedCluster,
Expand Down Expand Up @@ -665,19 +660,9 @@ func (r *ClusterReconciler) reportStatus(
readyReplicas += sts.LastObservedState.Status.ReadyReplicas
replicas += sts.LastObservedState.Status.Replicas

pods, err := sts.GetNodePoolPods(ctx)
if err != nil {
return fmt.Errorf("while retrieving STS %s list of pods: %w", sts.LastObservedState.Name, err)
}
podNames := make([]string, 0)
for _, pod := range pods.Items {
podNames = append(podNames, pod.Name)
}

nodePoolStatus[sts.Key().Name] = vectorizedv1alpha1.NodePoolStatus{
CurrentReplicas: sts.LastObservedState.Status.CurrentReplicas,
Replicas: sts.LastObservedState.Status.Replicas,
Pods: podNames,
}
}

Expand Down Expand Up @@ -1084,6 +1069,7 @@ func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vClust
if _, ok := actualBrokerIDs[broker.NodeID]; ok {
continue
}
log.Info("decommissioning ghost broker", "nodeid", broker.NodeID)
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,12 @@ func (a *attachedResources) statefulSet() error {
continue
}

replicas := sts.Spec.Replicas
if st, ok := a.cluster.Status.NodePools[sts.Name]; ok {
replicas = &st.CurrentReplicas
}

// Add the sts again in the items map in order for the reconciliation process to take place
// Since the np spec was removed, the replicas number hardcoded to 0.
a.items[stsKey] = resources.NewStatefulSet(
a.reconciler.Client,
a.cluster,
Expand All @@ -434,7 +438,8 @@ func (a *attachedResources) statefulSet() error {
a.reconciler.MetricsTimeout,
vectorizedv1alpha1.NodePoolSpec{
Name: npName,
Replicas: sts.Spec.Replicas,
Replicas: replicas,
Removed: true,
})
}

Expand Down
3 changes: 1 addition & 2 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ func NewStatefulSet(
// Ensure will manage kubernetes v1.StatefulSet for redpanda.vectorized.io custom resource
func (r *StatefulSetResource) Ensure(ctx context.Context) error {
log := r.logger.WithName("StatefulSetResource.Ensure").WithValues("nodepool", r.nodePool.Name)
var sts appsv1.StatefulSet

if r.pandaCluster.ExternalListener() != nil {
err := r.Get(ctx, r.nodePortName, &r.nodePortSvc)
if err != nil {
Expand Down Expand Up @@ -196,6 +194,7 @@ func (r *StatefulSetResource) Ensure(ctx context.Context) error {
return nil
}

var sts appsv1.StatefulSet
err = r.Get(ctx, r.Key(), &sts)
if err != nil {
return fmt.Errorf("error while fetching StatefulSet resource: %w", err)
Expand Down
Loading

0 comments on commit eabf138

Please sign in to comment.