Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Add a Cassandra upgrade mechanism #352

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Supported Configuration Changes
Navigator supports the following changes to a Cassandra cluster:

* :ref:`create-cluster-cassandra`: Add all initially configured node pools and nodes.
* :ref:`minor-upgrade-cassandra`: Trigger a rolling upgrade of Cassandra nodes by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``.
* :ref:`scale-out-cassandra`: Increase ``CassandraCluster.Spec.NodePools[0].Replicas`` to add more C* nodes to a ``nodepool``.

Navigator does not currently support any other changes to the Cassandra cluster configuration.
Expand All @@ -202,7 +203,6 @@ Unsupported Configuration Changes

The following configuration changes are not currently supported but will be supported in the near future:

* Minor Upgrade: Trigger a rolling Cassandra upgrade by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``.
* Scale In: Decrease ``CassandraCluster.Spec.NodePools[0].Replicas`` to remove C* nodes from a ``nodepool``.

The following configuration changes are not currently supported:
Expand All @@ -222,6 +222,19 @@ in order of ``NodePool`` and according to the process described in :ref:`scale-o
The order of node creation is determined by the order of the entries in the ``CassandraCluster.Spec.NodePools`` list.
You can look at ``CassandraCluster.Status.NodePools`` to see the current state.

.. _minor-upgrade-cassandra:

Minor Upgrade
~~~~~~~~~~~~~

If you increment the minor or patch number in ``CassandraCluster.Spec.Version``, Navigator will trigger a rolling update of the existing C* nodes.

C* nodes are upgraded serially, in order of NodePool and Pod ordinal, starting with the pod with the highest ordinal in the first NodePool.

`StatefulSet Rolling Updates <https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#rolling-updates>`_ describes the update process in more detail.

.. note:: Major version upgrades are not yet supported.

.. _scale-out-cassandra:

Scale Out
Expand Down
52 changes: 50 additions & 2 deletions hack/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,23 @@ function test_cassandracluster() {
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get pilots \
--output 'jsonpath={.items[0].status.cassandra.version}'
--selector "navigator.jetstack.io/cluster-type=CassandraCluster,navigator.jetstack.io/cluster-name=${CASS_NAME}" \
--output 'jsonpath={.items[*].status.cassandra.version}'
then
kubectl --namespace "${namespace}" get pilots -o yaml
fail_test "Pilots failed to report the expected version"
fi

if ! retry TIMEOUT=300 \
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get cassandracluster "${CASS_NAME}" \
--output 'jsonpath={.status.nodePools.*.version}'
then
kubectl --namespace "${namespace}" get cassandracluster -o yaml
fail_test "NodePools failed to report the expected version"
fi

# Wait 5 minutes for cassandra to start and listen for CQL queries.
if ! retry TIMEOUT=300 cql_connect \
"${namespace}" \
Expand Down Expand Up @@ -303,6 +314,43 @@ function test_cassandracluster() {
--debug \
--execute="INSERT INTO space1.testtable1(key, value) VALUES('testkey1', 'testvalue1')"

# Upgrade to newer patch version
export CASS_VERSION="3.11.2"
kubectl apply \
--namespace "${namespace}" \
--filename \
<(envsubst \
'$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_NODEPOOL1_DATACENTER:$CASS_NODEPOOL1_RACK:$CASS_NODEPOOL1_NAME:$CASS_REPLICAS:$CASS_VERSION' \
< "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml")

# The cluster is upgraded
if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \
"navigator-controller:CassandraCluster:Normal:UpdateVersion"
then
fail_test "An UpdateVersion event was not recorded"
fi

if ! retry TIMEOUT=300 \
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get pilots \
--selector "navigator.jetstack.io/cluster-type=CassandraCluster,navigator.jetstack.io/cluster-name=${CASS_NAME}" \
--output 'jsonpath={.items[*].status.cassandra.version}'
then
kubectl --namespace "${namespace}" get pilots -o yaml
fail_test "Pilots failed to report the expected version"
fi

if ! retry TIMEOUT=300 \
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get cassandracluster "${CASS_NAME}" \
--output 'jsonpath={.status.nodePools.*.version}'
then
kubectl --namespace "${namespace}" get cassandracluster -o yaml
fail_test "NodePools failed to report the expected version"
fi

# Delete the Cassandra pod and wait for the CQL service to become
# unavailable (readiness probe fails)

Expand Down Expand Up @@ -344,7 +392,7 @@ function test_cassandracluster() {
# Get names of nodepool pods before the scale out (line separated)
local original_pods_file="${ARTIFACTS_DIR}/test_cassandra.scale_out_original_pods"
kubectl --namespace "${namespace}" get pods \
--selector="navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \
--selector "navigator.jetstack.io/cluster-type=CassandraCluster,navigator.jetstack.io/cluster-name=${CASS_NAME}" \
--output='jsonpath={range .items[*]}{.metadata.name}{"\n"}{end}' \
> "${original_pods_file}"

Expand Down
19 changes: 13 additions & 6 deletions internal/test/util/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func Pilot(c PilotConfig) *v1alpha1.Pilot {
c.Namespace = "default"
}
labels := map[string]string{}
labels[v1alpha1.ElasticsearchClusterNameLabel] = c.Cluster
labels[v1alpha1.ElasticsearchNodePoolNameLabel] = c.NodePool
labels[v1alpha1.ClusterTypeLabel] = "ElasticsearchCluster"
labels[v1alpha1.ClusterNameLabel] = c.Cluster
labels[v1alpha1.NodePoolNameLabel] = c.NodePool
var version *semver.Version
if c.Version != "" {
version = semver.New(c.Version)
Expand Down Expand Up @@ -122,18 +123,24 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet {

type CassandraClusterConfig struct {
Name, Namespace string
Version *version.Version
}

func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster {
return &v1alpha1.CassandraCluster{
o := &v1alpha1.CassandraCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.Name,
Namespace: c.Namespace,
},
Spec: v1alpha1.CassandraClusterSpec{
Version: *version.New("3.11.2"),
},
Spec: v1alpha1.CassandraClusterSpec{},
}
if c.Version == nil {
o.Spec.Version = *version.New("3.11.2")
} else {
o.Spec.Version = *c.Version
}

return o
}

type CassandraClusterNodePoolConfig struct {
Expand Down
34 changes: 34 additions & 0 deletions pkg/api/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package version
import (
"encoding/json"
"strconv"
"strings"

semver "github.com/hashicorp/go-version"
)
Expand Down Expand Up @@ -59,6 +60,11 @@ func (v *Version) Semver() *semver.Version {
return v.semver
}

// TODO: Add tests for this
func (v *Version) LessThan(versionB *Version) bool {
return v.semver.LessThan(versionB.semver)
}

func (v *Version) UnmarshalJSON(data []byte) error {
s, err := strconv.Unquote(string(data))
if err != nil {
Expand All @@ -84,3 +90,31 @@ func (v Version) DeepCopy() Version {
}
return *New(v.String())
}

func (v *Version) bump(i int) *Version {
v2 := v.DeepCopy()
parts := strings.Split(v2.Semver().String(), ".")
part, err := strconv.Atoi(parts[i])
if err != nil {
panic(err)
}
part++
parts[i] = strconv.Itoa(part)
return New(strings.Join(parts, "."))
}

func (v *Version) BumpMajor() *Version {
return v.bump(0)
}

func (v *Version) BumpMinor() *Version {
return v.bump(1)
}

func (v *Version) BumpPatch() *Version {
return v.bump(2)
}

func (v *Version) Major() int64 {
return v.semver.Segments64()[0]
}
1 change: 1 addition & 0 deletions pkg/apis/navigator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CassandraClusterStatus struct {

type CassandraClusterNodePoolStatus struct {
ReadyReplicas int32
Version *version.Version
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
15 changes: 10 additions & 5 deletions pkg/apis/navigator/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (
)

const (
ElasticsearchClusterNameLabel = "navigator.jetstack.io/elasticsearch-cluster-name"
ElasticsearchNodePoolNameLabel = "navigator.jetstack.io/elasticsearch-node-pool-name"
ElasticsearchNodePoolVersionAnnotation = "navigator.jetstack.io/elasticsearch-version"
ElasticsearchRoleLabelPrefix = "navigator.jetstack.io/elasticsearch-role-"

CassandraClusterNameLabel = "navigator.jetstack.io/cassandra-cluster-name"
CassandraNodePoolNameLabel = "navigator.jetstack.io/cassandra-node-pool-name"
PilotLabel = "navigator.jetstack.io/has-pilot"
ClusterTypeLabel = "navigator.jetstack.io/cluster-type"
ClusterNameLabel = "navigator.jetstack.io/cluster-name"
NodePoolNameLabel = "navigator.jetstack.io/node-pool-name"
PilotLabel = "navigator.jetstack.io/has-pilot"
)

// +genclient
Expand Down Expand Up @@ -117,6 +116,12 @@ type CassandraClusterStatus struct {
type CassandraClusterNodePoolStatus struct {
// The number of replicas in the node pool that are currently 'Ready'.
ReadyReplicas int32 `json:"readyReplicas"`
// The lowest version of Cassandra found to be running in this nodepool,
// as reported by the Cassandra process.
// nil or empty if the lowest version can not be determined,
// or if the lowest version has not yet been determined
// +optional
Version *version.Version `json:"version,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/navigator/v1alpha1/zz_generated.conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNode

func autoConvert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClusterNodePoolStatus(in *CassandraClusterNodePoolStatus, out *navigator.CassandraClusterNodePoolStatus, s conversion.Scope) error {
out.ReadyReplicas = in.ReadyReplicas
out.Version = (*version.Version)(unsafe.Pointer(in.Version))
return nil
}

Expand All @@ -211,6 +212,7 @@ func Convert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClust

func autoConvert_navigator_CassandraClusterNodePoolStatus_To_v1alpha1_CassandraClusterNodePoolStatus(in *navigator.CassandraClusterNodePoolStatus, out *CassandraClusterNodePoolStatus, s conversion.Scope) error {
out.ReadyReplicas = in.ReadyReplicas
out.Version = (*version.Version)(unsafe.Pointer(in.Version))
return nil
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) {
*out = *in
if in.Version != nil {
in, out := &in.Version, &out.Version
if *in == nil {
*out = nil
} else {
*out = new(version.Version)
**out = **in
}
}
return
}

Expand Down Expand Up @@ -207,7 +216,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) {
in, out := &in.NodePools, &out.NodePools
*out = make(map[string]CassandraClusterNodePoolStatus, len(*in))
for key, val := range *in {
(*out)[key] = val
newVal := new(CassandraClusterNodePoolStatus)
val.DeepCopyInto(newVal)
(*out)[key] = *newVal
}
}
return
Expand Down
14 changes: 12 additions & 2 deletions pkg/apis/navigator/validation/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,30 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field.

fldPath := field.NewPath("spec")

if !new.Spec.Version.Equal(&old.Spec.Version) {
if new.Spec.Version.LessThan(&old.Spec.Version) {
allErrs = append(
allErrs,
field.Forbidden(
fldPath.Child("version"),
fmt.Sprintf(
"cannot change the version of an existing cluster. "+
"cannot perform version downgrades. "+
"old version: %s, new version: %s",
old.Spec.Version, new.Spec.Version,
),
),
)
}

if new.Spec.Version.Major() != old.Spec.Version.Major() {
allErrs = append(
allErrs,
field.Forbidden(
fldPath.Child("version"),
"cannot perform major version upgrades",
),
)
}

npPath := fldPath.Child("nodePools")
for i, newNp := range new.Spec.NodePools {
idxPath := npPath.Index(i)
Expand Down
16 changes: 12 additions & 4 deletions pkg/apis/navigator/validation/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,23 @@ func TestValidateCassandraClusterUpdate(t *testing.T) {
new: validCassCluster,
},
"downgrade not allowed": {
old: setVersion(validCassCluster, lowerVersion),
new: validCassCluster,
old: validCassCluster,
new: setVersion(validCassCluster, lowerVersion),
errorExpected: true,
},
"upgrade not allowed": {
"major upgrade not allowed": {
old: validCassCluster,
new: setVersion(validCassCluster, higherVersion),
new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMajor()),
errorExpected: true,
},
"minor version upgrade": {
old: validCassCluster,
new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()),
},
"patch version upgrade": {
old: validCassCluster,
new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpPatch()),
},
}

for title, persistence := range persistenceErrorCases {
Expand Down
13 changes: 12 additions & 1 deletion pkg/apis/navigator/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) {
*out = *in
if in.Version != nil {
in, out := &in.Version, &out.Version
if *in == nil {
*out = nil
} else {
*out = new(version.Version)
**out = **in
}
}
return
}

Expand Down Expand Up @@ -207,7 +216,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) {
in, out := &in.NodePools, &out.NodePools
*out = make(map[string]CassandraClusterNodePoolStatus, len(*in))
for key, val := range *in {
(*out)[key] = val
newVal := new(CassandraClusterNodePoolStatus)
val.DeepCopyInto(newVal)
(*out)[key] = *newVal
}
}
return
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/cassandra/actions/create_nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/pkg/errors"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
Expand All @@ -27,7 +29,7 @@ func (a *CreateNodePool) Execute(s *controllers.State) error {
return nil
}
if err != nil {
return err
return errors.Wrap(err, "unable to create statefulset")
}
s.Recorder.Eventf(
a.Cluster,
Expand Down
Loading