Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nodepools #167

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 178 additions & 4 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
package v1alpha1

import (
"context"
"fmt"
"net"
"net/url"
"strings"
"time"

cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources/featuregates"
)
Expand Down Expand Up @@ -116,7 +120,7 @@ type ClusterSpec struct {
// containers have separate resources settings and the amount of resources
// assigned to these containers will be required on the cluster on top of
// the resources defined here
Resources RedpandaResourceRequirements `json:"resources"`
Resources RedpandaResourceRequirements `json:"resources,omitempty"`
// Sidecars is list of sidecars run alongside redpanda container
Sidecars Sidecars `json:"sidecars,omitempty"`
// Configuration represent redpanda specific configuration
Expand Down Expand Up @@ -183,6 +187,125 @@ type ClusterSpec struct {

// The name of the ServiceAccount to be used by the Redpanda pods
ServiceAccount *string `json:"serviceAccount,omitempty"`

NodePools []*NodePoolSpec `json:"nodePools,omitempty"`
}

func (c *Cluster) GetNodePools() []*NodePoolSpec {
out := make([]*NodePoolSpec, 0)
if c.Spec.Replicas != nil && *c.Spec.Replicas > 0 {
np := &NodePoolSpec{
Name: "redpanda__imported",
Replicas: c.Spec.Replicas,
Tolerations: c.Spec.Tolerations,
NodeSelector: c.Spec.NodeSelector,
Storage: c.Spec.Storage,
Resources: c.Spec.Resources,
}
out = append(out, np)
}

for _, np := range c.Spec.NodePools {
if np == nil {
continue
}
out = append(out, np)
}

return out
}

func (c *Cluster) GetNodePoolsWithRemoved(ctx context.Context, k8sClient client.Reader) ([]*NodePoolSpec, error) {
nps := c.GetNodePools()

// If a node pool spec has been removed from the spec, we need to recreate it in order for the decomm process to kick in
var stsList appsv1.StatefulSetList
err := k8sClient.List(context.TODO(), &stsList)
if err != nil {
return nil, err
}
for _, sts := range stsList.Items {
if !strings.HasPrefix(sts.Name, c.Name) {
continue
}

var npName string
if strings.EqualFold(c.Name, sts.Name) {
npName = "redpanda__imported"
} else {
npName = sts.Name[len(c.Name)+1:]
}

replicas := sts.Spec.Replicas
if st, ok := c.Status.NodePools[sts.Name]; ok {
replicas = &st.CurrentReplicas
}

var redpandaContainer *corev1.Container
for _, container := range sts.Spec.Template.Spec.Containers {
if container.Name == "redpanda" {
redpandaContainer = &container
break
}
}
if redpandaContainer == nil {
return nil, fmt.Errorf("redpanda container not defined in STS %s template", sts.Name)
}

var vcCapacity resource.Quantity
var vcStorageClassName string
for _, vct := range sts.Spec.VolumeClaimTemplates {
if vct.Name != "datadir" {
continue
}
vcCapacity = vct.Spec.Resources.Requests[corev1.ResourceStorage]
if vct.Spec.StorageClassName != nil {
vcStorageClassName = *vct.Spec.StorageClassName
}
}

np := NodePoolSpec{
Name: npName,
Replicas: replicas,
Removed: true,
Resources: RedpandaResourceRequirements{
ResourceRequirements: redpandaContainer.Resources,
},
Tolerations: sts.Spec.Template.Spec.Tolerations,
NodeSelector: sts.Spec.Template.Spec.NodeSelector,
Storage: StorageSpec{
Capacity: vcCapacity,
StorageClassName: vcStorageClassName,
},
}

nps = append(nps, &np)
}
return nps, nil
}

type NodePoolSpec struct {
Name string `json:"name"`
// Replicas determine how big the node pool will be.
// +kubebuilder:validation:Minimum=0
Replicas *int32 `json:"replicas,omitempty"`
// If specified, Redpanda Pod tolerations
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
// If specified, Redpanda Pod node selectors. For reference please visit
// https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// Storage spec for cluster
Storage StorageSpec `json:"storage,omitempty"`
// Resources used by redpanda process running in container. Beware that
// there are multiple containers running in the redpanda pod and these can
// be enabled/disabled and configured from the `sidecars` field. These
// containers have separate resources settings and the amount of resources
// assigned to these containers will be required on the cluster on top of
// the resources defined here
Resources RedpandaResourceRequirements `json:"resources"`
// Removed is set internally to signal the reconciliation loop that this nodepool
// has been removed from spec
Removed bool `json:"removed,omitempty"`
}

// RestartConfig contains strategies to configure how the cluster behaves when restarting, because of upgrades
Expand Down Expand Up @@ -410,12 +533,17 @@ type ClusterStatus struct {
// Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number
// +optional
DecommissioningNode *int32 `json:"decommissioningNode,omitempty"`
// Indicates the pod that hosts the node we are currently decommissioning
// +optional
DecommissioningPod string `json:"decommissioningPod,omitempty"`
// Current version of the cluster.
// +optional
Version string `json:"version"`
// Current state of the cluster.
// +optional
Conditions []ClusterCondition `json:"conditions,omitempty"`
// Replicas per Nodepool
NodePools map[string]NodePoolStatus `json:"nodePools,omitempty"`
}

// ClusterCondition contains details for the current conditions of the cluster
Expand Down Expand Up @@ -661,6 +789,12 @@ type LoadBalancerStatus struct {
corev1.LoadBalancerStatus `json:""`
}

type NodePoolStatus struct {
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
ReadyReplicas int32 `json:"readyReplicas,omitEmpty"`
}

// KafkaAPITLS configures TLS for redpanda Kafka API
//
// If Enabled is set to true, one-way TLS verification is enabled.
Expand Down Expand Up @@ -1160,11 +1294,35 @@ func (r *Cluster) GetCurrentReplicas() int32 {
if r == nil {
return 0
}
if r.Status.CurrentReplicas <= 0 {
var currentReplicas int32

for _, np := range r.Status.NodePools {
currentReplicas += np.CurrentReplicas
}

if currentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
return currentReplicas
}

func (r *Cluster) GetReplicas() int32 {
nps := r.GetNodePools()
if r == nil || len(nps) == 0 {
return 0
}

replicas := int32(0)

for _, np := range nps {
if np == nil {
continue
}
replicas += *np.Replicas
}

return replicas
}

// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas.
Expand All @@ -1177,9 +1335,17 @@ func (r *Cluster) ComputeInitialCurrentReplicasField() int32 {
if r == nil {
return 0
}
nps := r.GetNodePools()
sum := int32(0)
if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 || featuregates.EmptySeedStartCluster(r.Spec.Version) {
// A cluster seems to be already running, we start from the existing amount of replicas
return *r.Spec.Replicas
for _, np := range nps {
if np == nil {
continue
}
sum += *np.Replicas
}
return sum
}

// Clusters start from a single replica, then upscale
Expand Down Expand Up @@ -1362,3 +1528,11 @@ func (r *Cluster) GetDecommissionBrokerID() *int32 {
func (r *Cluster) SetDecommissionBrokerID(id *int32) {
r.Status.DecommissioningNode = id
}

func (r *Cluster) GetDecomissionningPod() string {
return r.Status.DecommissioningPod
}

func (r *Cluster) SetDecommissioningPod(pod string) {
r.Status.DecommissioningPod = pod
}
14 changes: 12 additions & 2 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func TestInitialReplicas(t *testing.T) {
// backward compatibility. Remove when v22.2 is no longer supported.
cluster := v1alpha1.Cluster{}
cluster.Spec.Version = featuregates.V22_2_1.String()
cluster.Spec.Replicas = ptr.To(int32(3))
cluster.Spec.NodePools = []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
}
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
Expand All @@ -239,7 +244,12 @@ func TestInitialReplicas(t *testing.T) {

// test with latest version
cluster = v1alpha1.Cluster{}
cluster.Spec.Replicas = ptr.To(int32(3))
cluster.Spec.NodePools = []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
Expand Down
21 changes: 14 additions & 7 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,23 @@ func (r *Cluster) validateCommon(log logr.Logger) field.ErrorList {
}

func (r *Cluster) validateScaling() field.ErrorList {
replicas := r.GetReplicas()

// TODO: Validation
// Make sure changes to the cluster size are supported
// Make sure that CPU changes are supported
// Make sure only one change to nodepool takes place at the same time.
// i.e don't remove one and add another in the same go.
var allErrs field.ErrorList
if r.Spec.Replicas == nil {
if replicas == 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"replicas must be specified explicitly"))
} else if *r.Spec.Replicas <= 0 {
field.Invalid(field.NewPath("spec").Child("nodepools"),
replicas,
"replicas must be specified explicitly in each nodepool"))
} else if replicas <= 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
field.Invalid(field.NewPath("spec").Child("nodepools"),
replicas,
"downscaling is not allowed to less than 1 instance"))
}

Expand Down
23 changes: 17 additions & 6 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestNilReplicasIsNotAllowed(t *testing.T) {
rpCluster := validRedpandaCluster()
_, err := rpCluster.ValidateCreate()
require.Nil(t, err, "Initial cluster is not valid")
rpCluster.Spec.Replicas = nil
rpCluster.Spec.NodePools = nil
_, err = rpCluster.ValidateCreate()
assert.Error(t, err)
}
Expand All @@ -331,7 +331,6 @@ func TestValidateUpdate_NoError(t *testing.T) {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
Replicas: ptr.To(replicas2),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124, AuthenticationMethod: "none"}},
AdminAPI: []v1alpha1.AdminAPI{{Port: 125}},
Expand All @@ -348,6 +347,12 @@ func TestValidateUpdate_NoError(t *testing.T) {
},
Redpanda: nil,
},
NodePools: []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(replicas2),
},
},
},
}

Expand All @@ -357,7 +362,7 @@ func TestValidateUpdate_NoError(t *testing.T) {
})

t.Run("scale up", func(t *testing.T) {
scaleUp := *redpandaCluster.Spec.Replicas + 1
scaleUp := *redpandaCluster.Spec.NodePools[0].Replicas + 1
updatedScaleUp := redpandaCluster.DeepCopy()
updatedScaleUp.Spec.Replicas = &scaleUp
_, err := updatedScaleUp.ValidateUpdate(redpandaCluster)
Expand Down Expand Up @@ -723,17 +728,17 @@ func TestValidateUpdate_NoError(t *testing.T) {
for _, tc := range decreaseCases {
t.Run(fmt.Sprintf("CPU request change from %s to %s", tc.initial, tc.target), func(t *testing.T) {
oldCluster := redpandaCluster.DeepCopy()
oldCluster.Spec.Resources.Requests = corev1.ResourceList{
oldCluster.Spec.NodePools[0].Resources.Requests = corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("20Gi"),
corev1.ResourceCPU: resource.MustParse(tc.initial),
}
oldCluster.Spec.Resources.Limits = corev1.ResourceList{
oldCluster.Spec.NodePools[0].Resources.Limits = corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("20Gi"),
corev1.ResourceCPU: resource.MustParse(tc.initial),
}

newCluster := redpandaCluster.DeepCopy()
newCluster.Spec.Resources.Requests = corev1.ResourceList{
newCluster.Spec.NodePools[0].Resources.Requests = corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("20Gi"),
corev1.ResourceCPU: resource.MustParse(tc.target),
}
Expand Down Expand Up @@ -1960,6 +1965,12 @@ func validRedpandaCluster() *v1alpha1.Cluster {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
NodePools: []*v1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: ptr.To(int32(3)),
},
},
Replicas: ptr.To(int32(1)),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124, AuthenticationMethod: "none"}},
Expand Down
Loading