Skip to content

Commit

Permalink
scaling down by removing a nodepool
Browse files Browse the repository at this point in the history
also bugfixes and changes to some tests
  • Loading branch information
koikonom committed Jul 11, 2024
1 parent bb4c9ae commit a353089
Show file tree
Hide file tree
Showing 21 changed files with 394 additions and 161 deletions.
29 changes: 24 additions & 5 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type NodePoolSpec struct {
// assigned to these containers will be required on the cluster on top of
// the resources defined here
Resources RedpandaResourceRequirements `json:"resources"`
// Removed is set internally to signal the reconciliation loop that this nodepool
// has been removed from spec
Removed bool `json:"removed,omitempty"`
}

// RestartConfig contains strategies to configure how the cluster behaves when restarting, because of upgrades
Expand Down Expand Up @@ -433,6 +436,9 @@ type ClusterStatus struct {
// Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number
// +optional
DecommissioningNode *int32 `json:"decommissioningNode,omitempty"`
// Indicates the pod that hosts the node we are currently decommissioning
// +optional
DecommissioningPod string `json:"decommissioningPod,omitempty"`
// Current version of the cluster.
// +optional
Version string `json:"version"`
Expand Down Expand Up @@ -687,9 +693,8 @@ type LoadBalancerStatus struct {
}

type NodePoolStatus struct {
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
Pods []string `json:"pods,omitempty"`
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
}

// KafkaAPITLS configures TLS for redpanda Kafka API
Expand Down Expand Up @@ -1191,11 +1196,17 @@ func (r *Cluster) GetCurrentReplicas() int32 {
if r == nil {
return 0
}
if r.Status.CurrentReplicas <= 0 {
var currentReplicas int32

for _, np := range r.Status.NodePools {
currentReplicas += np.CurrentReplicas
}

if currentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
return currentReplicas
}

func (r *Cluster) GetReplicas() int32 {
Expand Down Expand Up @@ -1419,3 +1430,11 @@ func (r *Cluster) GetDecommissionBrokerID() *int32 {
func (r *Cluster) SetDecommissionBrokerID(id *int32) {
r.Status.DecommissioningNode = id
}

func (r *Cluster) GetDecomissionningPod() string {
return r.Status.DecommissioningPod
}

func (r *Cluster) SetDecommissioningPod(pod string) {
r.Status.DecommissioningPod = pod
}
14 changes: 12 additions & 2 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func TestInitialReplicas(t *testing.T) {
// backward compatibility. Remove when v22.2 is no longer supported.
cluster := v1alpha1.Cluster{}
cluster.Spec.Version = featuregates.V22_2_1.String()
cluster.Spec.Replicas = ptr.To(int32(3))
cluster.Spec.NodePools = []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
}
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
Expand All @@ -239,7 +244,12 @@ func TestInitialReplicas(t *testing.T) {

// test with latest version
cluster = v1alpha1.Cluster{}
cluster.Spec.Replicas = ptr.To(int32(3))
cluster.Spec.NodePools = []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
Expand Down
23 changes: 17 additions & 6 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestNilReplicasIsNotAllowed(t *testing.T) {
rpCluster := validRedpandaCluster()
_, err := rpCluster.ValidateCreate()
require.Nil(t, err, "Initial cluster is not valid")
rpCluster.Spec.Replicas = nil
rpCluster.Spec.NodePools = nil
_, err = rpCluster.ValidateCreate()
assert.Error(t, err)
}
Expand All @@ -331,7 +331,6 @@ func TestValidateUpdate_NoError(t *testing.T) {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
Replicas: ptr.To(replicas2),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124, AuthenticationMethod: "none"}},
AdminAPI: []v1alpha1.AdminAPI{{Port: 125}},
Expand All @@ -348,6 +347,12 @@ func TestValidateUpdate_NoError(t *testing.T) {
},
Redpanda: nil,
},
NodePools: []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(replicas2),
},
},
},
}

Expand All @@ -357,7 +362,7 @@ func TestValidateUpdate_NoError(t *testing.T) {
})

t.Run("scale up", func(t *testing.T) {
scaleUp := *redpandaCluster.Spec.Replicas + 1
scaleUp := *redpandaCluster.Spec.NodePools[0].Replicas + 1
updatedScaleUp := redpandaCluster.DeepCopy()
updatedScaleUp.Spec.Replicas = &scaleUp
_, err := updatedScaleUp.ValidateUpdate(redpandaCluster)
Expand Down Expand Up @@ -723,17 +728,17 @@ func TestValidateUpdate_NoError(t *testing.T) {
for _, tc := range decreaseCases {
t.Run(fmt.Sprintf("CPU request change from %s to %s", tc.initial, tc.target), func(t *testing.T) {
oldCluster := redpandaCluster.DeepCopy()
oldCluster.Spec.Resources.Requests = corev1.ResourceList{
oldCluster.Spec.NodePools[0].Resources.Requests = corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("20Gi"),
corev1.ResourceCPU: resource.MustParse(tc.initial),
}
oldCluster.Spec.Resources.Limits = corev1.ResourceList{
oldCluster.Spec.NodePools[0].Resources.Limits = corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("20Gi"),
corev1.ResourceCPU: resource.MustParse(tc.initial),
}

newCluster := redpandaCluster.DeepCopy()
newCluster.Spec.Resources.Requests = corev1.ResourceList{
newCluster.Spec.NodePools[0].Resources.Requests = corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("20Gi"),
corev1.ResourceCPU: resource.MustParse(tc.target),
}
Expand Down Expand Up @@ -1960,6 +1965,12 @@ func validRedpandaCluster() *v1alpha1.Cluster {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
NodePools: []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
},
Replicas: ptr.To(int32(1)),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124, AuthenticationMethod: "none"}},
Expand Down
7 changes: 1 addition & 6 deletions src/go/k8s/api/vectorized/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,11 @@ spec:
If specified, Redpanda Pod node selectors. For reference please visit
https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node
type: object
removed:
description: |-
Removed is set internally to signal the reconciliation loop that this nodepool
has been removed from spec
type: boolean
replicas:
description: Replicas determine how big the node pool will be.
format: int32
Expand Down Expand Up @@ -1522,6 +1527,10 @@ spec:
from the cluster and provides its ordinal number
format: int32
type: integer
decommissioningPod:
description: Indicates the pod that hosts the node we are currently
decommissioning
type: string
nodePools:
additionalProperties:
properties:
Expand Down
41 changes: 14 additions & 27 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, fmt.Errorf("creating statefulset: %w", err)
}

if vectorizedCluster.Status.CurrentReplicas >= 1 {
if vectorizedCluster.GetCurrentReplicas() >= 1 {
if err = r.setPodNodeIDAnnotation(ctx, &vectorizedCluster, log, ar); err != nil {
log.Error(err, "setting pod node_id annotation")
}
Expand Down Expand Up @@ -228,12 +228,6 @@ func (r *ClusterReconciler) Reconcile(
return ctrl.Result{}, err
}

npStatus := make(map[string]string)
for _, sts := range stSets {
rep := strconv.FormatInt(int64(sts.GetReplicas()), 10)
npStatus[sts.Key().Name] = rep
}

err = r.reportStatus(
ctx,
&vectorizedCluster,
Expand Down Expand Up @@ -644,17 +638,14 @@ func (r *ClusterReconciler) reportStatus(
nodeList.Internal = observedNodesInternal
nodeList.SchemaRegistry.Internal = fmt.Sprintf("%s:%d", clusterFQDN, schemaRegistryPort)

//nolint:nestif // the code won't get clearer if it's splitted out in my opinion
version, versionErr := stSets[0].CurrentVersion(ctx)
if versionErr != nil {
// this is non-fatal error, it will return error even if e.g.
// the rollout is not finished because then the currentversion
// of the cluster cannot be determined
r.Log.Info(fmt.Sprintf("cannot get CurrentVersion of statefulset, %s", versionErr))
if len(stSets) == 0 {
r.Log.Info("no stateful sets found")
}

nodePoolStatus := make(map[string]vectorizedv1alpha1.NodePoolStatus)
var version string
var versionErr error

nodePoolStatus := make(map[string]vectorizedv1alpha1.NodePoolStatus)
readyReplicas := int32(0)
replicas := int32(0)
for _, sts := range stSets {
Expand All @@ -665,22 +656,17 @@ func (r *ClusterReconciler) reportStatus(
readyReplicas += sts.LastObservedState.Status.ReadyReplicas
replicas += sts.LastObservedState.Status.Replicas

pods, err := sts.GetNodePoolPods(ctx)
if err != nil {
return fmt.Errorf("while retrieving STS %s list of pods: %w", sts.LastObservedState.Name, err)
}
podNames := make([]string, 0)
for _, pod := range pods.Items {
podNames = append(podNames, pod.Name)
}

nodePoolStatus[sts.Key().Name] = vectorizedv1alpha1.NodePoolStatus{
CurrentReplicas: sts.LastObservedState.Status.CurrentReplicas,
Replicas: sts.LastObservedState.Status.Replicas,
Pods: podNames,
// We do not update current replicas here.
CurrentReplicas: redpandaCluster.Status.NodePools[sts.Key().Name].CurrentReplicas,
Replicas: sts.GetReplicas(),
}
version, versionErr = sts.CurrentVersion(ctx)
}

if versionErr != nil || version == "" {
r.Log.Info(fmt.Sprintf("cannot get CurrentVersion of statefulset, %s", versionErr))
}
if !statusShouldBeUpdated(&redpandaCluster.Status, nodeList, replicas, readyReplicas, version, versionErr, nodePoolStatus) {
return nil
}
Expand Down Expand Up @@ -1084,6 +1070,7 @@ func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vClust
if _, ok := actualBrokerIDs[broker.NodeID]; ok {
continue
}
log.Info("decommissioning ghost broker", "nodeid", broker.NodeID)
if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil {
return fmt.Errorf("failed to decommission ghost broker: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources/certmanager"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -414,8 +416,35 @@ func (a *attachedResources) statefulSet() error {
continue
}

replicas := sts.Spec.Replicas
if st, ok := a.cluster.Status.NodePools[sts.Name]; ok {
replicas = &st.CurrentReplicas
}

var redpandaContainer *corev1.Container
for _, container := range sts.Spec.Template.Spec.Containers {
if container.Name == "redpanda" {
redpandaContainer = &container
break
}
}
if redpandaContainer == nil {
return fmt.Errorf("redpanda container not defined in STS %s template", sts.Name)
}

var vcCapacity resource.Quantity
var vcStorageClassName string
for _, vct := range sts.Spec.VolumeClaimTemplates {
if vct.Name != "datadir" {
continue
}
vcCapacity = vct.Spec.Resources.Requests[corev1.ResourceStorage]
if vct.Spec.StorageClassName != nil {
vcStorageClassName = *vct.Spec.StorageClassName
}
}

// Add the sts again in the items map in order for the reconciliation process to take place
// Since the np spec was removed, the replicas number hardcoded to 0.
a.items[stsKey] = resources.NewStatefulSet(
a.reconciler.Client,
a.cluster,
Expand All @@ -432,9 +461,20 @@ func (a *attachedResources) statefulSet() error {
a.reconciler.DecommissionWaitInterval,
a.log,
a.reconciler.MetricsTimeout,

vectorizedv1alpha1.NodePoolSpec{
Name: npName,
Replicas: sts.Spec.Replicas,
Replicas: replicas,
Removed: true,
Resources: vectorizedv1alpha1.RedpandaResourceRequirements{
ResourceRequirements: redpandaContainer.Resources,
},
Tolerations: sts.Spec.Template.Spec.Tolerations,
NodeSelector: sts.Spec.Template.Spec.NodeSelector,
Storage: vectorizedv1alpha1.StorageSpec{
Capacity: vcCapacity,
StorageClassName: vcStorageClassName,
},
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ var _ = Describe("RedpandaCluster configuration controller", func() {
testutils.DeleteAllInNamespace(testEnv, k8sClient, namespace)
})

// KO: YOU ARE HERE: fix this test
It("Should be able to upgrade and change a cluster even if the admin API is unavailable", func() {
testAdminAPI.SetUnavailable(true)

Expand Down
Loading

0 comments on commit a353089

Please sign in to comment.