Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Allow configuring number of seed nodes per nodepool #264

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions docs/quick-start/cassandra-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ spec:
nodePools:
- name: "ringnodes"
replicas: 3
seeds: 2
datacenter: "demo-datacenter"
rack: "demo-rack"
persistence:
Expand Down
44 changes: 31 additions & 13 deletions hack/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ if [[ "test_elasticsearchcluster" = "${TEST_PREFIX}"* ]]; then
kube_delete_namespace_and_wait "${ES_TEST_NS}"
fi

function apply_cassandracluster() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❔ The other bash functions have parameters (of sorts) so it might be nice to be consistent. On the other hand it looks like we're moving away from bash based E2E tests to happy for you to leave this for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it'd be nice-to-have a check that all the environment variables that are about to be substituted are actually set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix these e2e things when we move to ginkgo

kubectl apply \
--namespace "${namespace}" \
--filename \
<(envsubst \
'$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION:$CASS_SEEDS' \
< "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml")
}

function test_cassandracluster() {
echo "Testing CassandraCluster"
local namespace="${1}"
Expand All @@ -230,6 +239,7 @@ function test_cassandracluster() {
export CASS_REPLICAS=1
export CASS_CQL_PORT=9042
export CASS_VERSION="3.11.1"
export CASS_SEEDS=1

kube_create_namespace_with_quota "${namespace}"

Expand All @@ -239,12 +249,7 @@ function test_cassandracluster() {
fail_test "Failed to get cassandraclusters"
fi

if ! kubectl apply \
--namespace "${namespace}" \
--filename \
<(envsubst \
'$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION' \
< "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml")
if ! apply_cassandracluster
then
fail_test "Failed to create cassandracluster"
fi
Expand Down Expand Up @@ -338,12 +343,10 @@ function test_cassandracluster() {

# Increment the replica count
export CASS_REPLICAS=2
kubectl apply \
--namespace "${namespace}" \
--filename \
<(envsubst \
'$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION' \
< "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml")
if ! apply_cassandracluster
then
fail_test "Failed to apply cassandracluster"
fi

if ! retry TIMEOUT=300 stdout_equals 2 kubectl \
--namespace "${namespace}" \
Expand All @@ -357,7 +360,7 @@ function test_cassandracluster() {
# TODO: A better test would be to query the endpoints and check that only
# the `-0` pods are included. E.g.
# kubectl -n test-cassandra-1519754828-19864 get ep cass-cassandra-1519754828-19864-cassandra-seeds -o "jsonpath={.subsets[*].addresses[*].hostname}"
if ! stdout_equals "cass-${CASS_NAME}-ringnodes-0" \
if ! retry stdout_equals "cass-${CASS_NAME}-ringnodes-0" \
kubectl get pods --namespace "${namespace}" \
--selector=navigator.jetstack.io/cassandra-seed=true \
--output 'jsonpath={.items[*].metadata.name}'
Expand Down Expand Up @@ -392,6 +395,21 @@ function test_cassandracluster() {
then
fail_test "Cassandra liveness probe failed to restart dead node"
fi

export CASS_REPLICAS=2
export CASS_SEEDS=2
if ! apply_cassandracluster
then
fail_test "Failed to apply cassandracluster"
fi

if ! retry stdout_equals "cass-${CASS_NAME}-ringnodes-0 cass-${CASS_NAME}-ringnodes-1" \
kubectl get pods --namespace "${namespace}" \
--selector=navigator.jetstack.io/cassandra-seed=true \
--output 'jsonpath={.items[*].metadata.name}'
then
fail_test "Second cassandra node not marked as seed"
fi
}

if [[ "test_cassandracluster" = "${TEST_PREFIX}"* ]]; then
Expand Down
1 change: 1 addition & 0 deletions hack/testdata/cass-cluster-test.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ spec:
replicas: ${CASS_REPLICAS}
datacenter: "${CASS_NAME}-datacenter"
rack: "{CASS_NAME}-rack"
seeds: ${CASS_SEEDS}
persistence:
enabled: true
size: "5Gi"
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/navigator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type CassandraClusterNodePool struct {
Datacenter string
Resources v1.ResourceRequirements
SchedulerName string
Seeds *int32
}

type CassandraClusterStatus struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/navigator/v1alpha1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v1alpha1

import (
"k8s.io/apimachinery/pkg/runtime"

"github.com/jetstack/navigator/pkg/util/ptr"
)

const (
Expand All @@ -20,4 +22,9 @@ func SetDefaults_CassandraClusterNodePool(np *CassandraClusterNodePool) {
if np.Rack == "" {
np.Rack = np.Name
}

// default to 1 seed if not specified
if np.Seeds == nil {
np.Seeds = ptr.Int32(1)
}
}
5 changes: 5 additions & 0 deletions pkg/apis/navigator/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type CassandraClusterNodePool struct {
// If not specified, the pod will be dispatched by default scheduler.
// +optional
SchedulerName string `json:"schedulerName,omitempty"`

// Seeds specifies the number of seed nodes to allocate in this nodepool. By
// default, 1 is selected.
// +optional
Seeds *int32 `json:"seeds,omitempty"`
}

type CassandraClusterStatus struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/navigator/v1alpha1/zz_generated.conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func autoConvert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraCluster
out.Datacenter = in.Datacenter
out.Resources = in.Resources
out.SchedulerName = in.SchedulerName
out.Seeds = (*int32)(unsafe.Pointer(in.Seeds))
return nil
}

Expand All @@ -192,6 +193,7 @@ func autoConvert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraCluster
out.Datacenter = in.Datacenter
out.Resources = in.Resources
out.SchedulerName = in.SchedulerName
out.Seeds = (*int32)(unsafe.Pointer(in.Seeds))
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ func (in *CassandraClusterNodePool) DeepCopyInto(out *CassandraClusterNodePool)
}
}
in.Resources.DeepCopyInto(&out.Resources)
if in.Seeds != nil {
in, out := &in.Seeds, &out.Seeds
if *in == nil {
*out = nil
} else {
*out = new(int32)
**out = **in
}
}
return
}

Expand Down
23 changes: 22 additions & 1 deletion pkg/apis/navigator/validation/cassandra.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package validation

import (
"fmt"
"reflect"

apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
Expand All @@ -13,7 +14,27 @@ import (
func ValidateCassandraClusterNodePool(np *navigator.CassandraClusterNodePool, fldPath *field.Path) field.ErrorList {
// TODO: call k8s.io/kubernetes/pkg/apis/core/validation.ValidateResourceRequirements on np.Resources
// this will require vendoring kubernetes/kubernetes.
return field.ErrorList{}

allErrs := field.ErrorList{}

if np.Seeds != nil {
if *np.Seeds > np.Replicas {
allErrs = append(allErrs,
field.Invalid(
fldPath.Child("seeds"),
np.Seeds,
fmt.Sprintf("number of seeds cannot be greater than number of replicas (%d)", np.Replicas)),
)
}

if *np.Seeds < 1 {
allErrs = append(allErrs,
field.Invalid(fldPath.Child("seeds"), np.Seeds, "number of seeds must be greater than or equal to 1"),
)
}
}

return allErrs
}

func ValidateCassandraCluster(c *navigator.CassandraCluster) field.ErrorList {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/navigator/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ func (in *CassandraClusterNodePool) DeepCopyInto(out *CassandraClusterNodePool)
}
}
in.Resources.DeepCopyInto(&out.Resources)
if in.Seeds != nil {
in, out := &in.Seeds, &out.Seeds
if *in == nil {
*out = nil
} else {
*out = new(int32)
**out = **in
}
}
return
}

Expand Down
72 changes: 48 additions & 24 deletions pkg/controllers/cassandra/seedlabeller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,60 @@ func NewControl(

func (c *defaultSeedLabeller) labelSeedNodes(
cluster *v1alpha1.CassandraCluster,
np *v1alpha1.CassandraClusterNodePool,
set *appsv1beta1.StatefulSet,
) error {
// TODO: make number of seed nodes configurable
pod, err := c.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, 0))
if err != nil {
glog.Warningf("Couldn't get stateful set pod: %v", err)
return nil
}
labels := pod.Labels
value := labels[service.SeedLabelKey]
if value == service.SeedLabelValue {
return nil
}
if labels == nil {
labels = map[string]string{}
for i := int32(0); i < np.Replicas; i++ {
pod, err := c.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, i))
if err != nil {
glog.Warningf("Couldn't get stateful set pod: %v", err)
return nil
}

// label first n as seeds
isSeed := i < *np.Seeds

desiredLabel := ""
if isSeed {
desiredLabel = service.SeedLabelValue
}

labels := pod.Labels
value := labels[service.SeedLabelKey]
if value == desiredLabel {
continue
}
if labels == nil {
labels = map[string]string{}
}

if isSeed {
labels[service.SeedLabelKey] = desiredLabel
} else {
delete(labels, service.SeedLabelKey)
}

podCopy := pod.DeepCopy()
podCopy.SetLabels(labels)
_, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy)
if err != nil {
return err
}
}
labels[service.SeedLabelKey] = service.SeedLabelValue
podCopy := pod.DeepCopy()
podCopy.SetLabels(labels)
_, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy)
return err
return nil
}

func (c *defaultSeedLabeller) Sync(cluster *v1alpha1.CassandraCluster) error {
sets, err := util.StatefulSetsForCluster(cluster, c.statefulSetLister)
if err != nil {
return err
}
for _, s := range sets {
err = c.labelSeedNodes(cluster, s)
for _, np := range cluster.Spec.NodePools {
setName := util.NodePoolResourceName(cluster, &np)

set, err := c.statefulSetLister.StatefulSets(cluster.Namespace).Get(setName)
if err != nil {
glog.Warningf("Couldn't get stateful set: %v", err)
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an aggregate error here too.

}

err = c.labelSeedNodes(cluster, &np, set)
if err != nil {
return err
}
Expand Down
Loading