Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Add a Cassandra upgrade mechanism
Browse files Browse the repository at this point in the history
* Add a NodePool.Status.Version attribute which has the lowest reported version from all the pilots in the pool.
* Or nil, if the pilot failed to query its Cassandra database for its version.
* Calculate an UpdateVersion action if the desired Cassandra version is higher than the version reported for a NodePool.
* NodePools are upgraded one at a time.
* Pods within a Nodepool are upgraded one at a time, using a rolling update strategy without any partitioning.
* Upgrades are performed after Nodepools are created and after scale out.

Fixes: #257
  • Loading branch information
wallrj committed May 9, 2018
1 parent 4eb5cb2 commit 9c64f5f
Show file tree
Hide file tree
Showing 27 changed files with 734 additions and 75 deletions.
15 changes: 14 additions & 1 deletion docs/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ Supported Configuration Changes
Navigator supports the following changes to a Cassandra cluster:

* :ref:`create-cluster-cassandra`: Add all initially configured node pools and nodes.
* :ref:`minor-upgrade-cassandra`: Trigger a rolling upgrade of Cassandra nodes by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``.
* :ref:`scale-out-cassandra`: Increase ``CassandraCluster.Spec.NodePools[0].Replicas`` to add more C* nodes to a ``nodepool``.

Navigator does not currently support any other changes to the Cassandra cluster configuration.
Expand All @@ -200,7 +201,6 @@ Unsupported Configuration Changes

The following configuration changes are not currently supported but will be supported in the near future:

* Minor Upgrade: Trigger a rolling Cassandra upgrade by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``.
* Scale In: Decrease ``CassandraCluster.Spec.NodePools[0].Replicas`` to remove C* nodes from a ``nodepool``.

The following configuration changes are not currently supported:
Expand All @@ -220,6 +220,19 @@ in order of ``NodePool`` and according to the process described in :ref:`scale-o
The order of node creation is determined by the order of the entries in the ``CassandraCluster.Spec.NodePools`` list.
You can look at ``CassandraCluster.Status.NodePools`` to see the current state.

.. _minor-upgrade-cassandra:

Minor Upgrade
~~~~~~~~~~~~~

If you increment the minor or patch number in ``CassandraCluster.Spec.Version``, Navigator will trigger a rolling update of the existing C* nodes.

C* nodes are upgraded serially, in order of NodePool and Pod ordinal, starting with the pod with the highest ordinal in the first NodePool.

`StatefulSet Rolling Updates <https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#rolling-updates>`_ describes the update process in more detail.

.. note:: Major version upgrades are not yet supported.

.. _scale-out-cassandra:

Scale Out
Expand Down
50 changes: 49 additions & 1 deletion hack/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,23 @@ function test_cassandracluster() {
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get pilots \
--output 'jsonpath={.items[0].status.cassandra.version}'
--selector "navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \
--output 'jsonpath={.items[*].status.cassandra.version}'
then
kubectl --namespace "${namespace}" get pilots -o yaml
fail_test "Pilots failed to report the expected version"
fi

if ! retry TIMEOUT=300 \
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get cassandracluster "${CASS_NAME}" \
--output 'jsonpath={.status.nodePools.*.version}'
then
kubectl --namespace "${namespace}" get cassandracluster -o yaml
fail_test "NodePools failed to report the expected version"
fi

# Wait 5 minutes for cassandra to start and listen for CQL queries.
if ! retry TIMEOUT=300 cql_connect \
"${namespace}" \
Expand Down Expand Up @@ -303,6 +314,43 @@ function test_cassandracluster() {
--debug \
--execute="INSERT INTO space1.testtable1(key, value) VALUES('testkey1', 'testvalue1')"

# Upgrade to newer patch version
export CASS_VERSION="3.11.2"
kubectl apply \
--namespace "${namespace}" \
--filename \
<(envsubst \
'$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \
< "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml")

# The cluster is upgraded
if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \
"navigator-controller:CassandraCluster:Normal:UpdateVersion"
then
fail_test "An UpdateVersion event was not recorded"
fi

if ! retry TIMEOUT=300 \
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get pilots \
--selector "navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \
--output 'jsonpath={.items[*].status.cassandra.version}'
then
kubectl --namespace "${namespace}" get pilots -o yaml
fail_test "Pilots failed to report the expected version"
fi

if ! retry TIMEOUT=300 \
stdout_equals "${CASS_VERSION}" \
kubectl --namespace "${namespace}" \
get cassandracluster "${CASS_NAME}" \
--output 'jsonpath={.status.nodePools.*.version}'
then
kubectl --namespace "${namespace}" get cassandracluster -o yaml
fail_test "NodePools failed to report the expected version"
fi

# Delete the Cassandra pod and wait for the CQL service to become
# unavailable (readiness probe fails)

Expand Down
1 change: 1 addition & 0 deletions hack/testdata/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ controller:
repository: quay.io/jetstack/navigator-controller
tag: build
pullPolicy: Never
logLevel: 4
14 changes: 10 additions & 4 deletions internal/test/util/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,24 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet {

type CassandraClusterConfig struct {
Name, Namespace string
Version *version.Version
}

func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster {
return &v1alpha1.CassandraCluster{
o := &v1alpha1.CassandraCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.Name,
Namespace: c.Namespace,
},
Spec: v1alpha1.CassandraClusterSpec{
Version: *version.New("3.11.2"),
},
Spec: v1alpha1.CassandraClusterSpec{},
}
if c.Version == nil {
o.Spec.Version = *version.New("3.11.2")
} else {
o.Spec.Version = *c.Version
}

return o
}

type CassandraClusterNodePoolConfig struct {
Expand Down
34 changes: 34 additions & 0 deletions pkg/api/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package version
import (
"encoding/json"
"strconv"
"strings"

semver "github.com/hashicorp/go-version"
)
Expand Down Expand Up @@ -59,6 +60,11 @@ func (v *Version) Semver() *semver.Version {
return v.semver
}

// TODO: Add tests for this
func (v *Version) LessThan(versionB *Version) bool {
return v.semver.LessThan(versionB.semver)
}

func (v *Version) UnmarshalJSON(data []byte) error {
s, err := strconv.Unquote(string(data))
if err != nil {
Expand All @@ -84,3 +90,31 @@ func (v Version) DeepCopy() Version {
}
return *New(v.String())
}

func (v *Version) bump(i int) *Version {
v2 := v.DeepCopy()
parts := strings.Split(v2.Semver().String(), ".")
part, err := strconv.Atoi(parts[i])
if err != nil {
panic(err)
}
part++
parts[i] = strconv.Itoa(part)
return New(strings.Join(parts, "."))
}

func (v *Version) BumpMajor() *Version {
return v.bump(0)
}

func (v *Version) BumpMinor() *Version {
return v.bump(1)
}

func (v *Version) BumpPatch() *Version {
return v.bump(2)
}

func (v *Version) Major() int64 {
return v.semver.Segments64()[0]
}
1 change: 1 addition & 0 deletions pkg/apis/navigator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type CassandraClusterStatus struct {

type CassandraClusterNodePoolStatus struct {
ReadyReplicas int32
Version *version.Version
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/navigator/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ type CassandraClusterStatus struct {
type CassandraClusterNodePoolStatus struct {
// The number of replicas in the node pool that are currently 'Ready'.
ReadyReplicas int32 `json:"readyReplicas"`
// The lowest version of Cassandra found to be running in this nodepool,
// as reported by the Cassandra process.
// nil or empty if the lowest version can not be determined,
// or if the lowest version has not yet been determined
// +optional
Version *version.Version `json:"version,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/navigator/v1alpha1/zz_generated.conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNode

func autoConvert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClusterNodePoolStatus(in *CassandraClusterNodePoolStatus, out *navigator.CassandraClusterNodePoolStatus, s conversion.Scope) error {
out.ReadyReplicas = in.ReadyReplicas
out.Version = (*version.Version)(unsafe.Pointer(in.Version))
return nil
}

Expand All @@ -207,6 +208,7 @@ func Convert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClust

func autoConvert_navigator_CassandraClusterNodePoolStatus_To_v1alpha1_CassandraClusterNodePoolStatus(in *navigator.CassandraClusterNodePoolStatus, out *CassandraClusterNodePoolStatus, s conversion.Scope) error {
out.ReadyReplicas = in.ReadyReplicas
out.Version = (*version.Version)(unsafe.Pointer(in.Version))
return nil
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) {
*out = *in
if in.Version != nil {
in, out := &in.Version, &out.Version
if *in == nil {
*out = nil
} else {
*out = new(version.Version)
**out = **in
}
}
return
}

Expand Down Expand Up @@ -206,7 +215,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) {
in, out := &in.NodePools, &out.NodePools
*out = make(map[string]CassandraClusterNodePoolStatus, len(*in))
for key, val := range *in {
(*out)[key] = val
newVal := new(CassandraClusterNodePoolStatus)
val.DeepCopyInto(newVal)
(*out)[key] = *newVal
}
}
return
Expand Down
14 changes: 12 additions & 2 deletions pkg/apis/navigator/validation/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,30 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field.

fldPath := field.NewPath("spec")

if !new.Spec.Version.Equal(&old.Spec.Version) {
if new.Spec.Version.LessThan(&old.Spec.Version) {
allErrs = append(
allErrs,
field.Forbidden(
fldPath.Child("version"),
fmt.Sprintf(
"cannot change the version of an existing cluster. "+
"cannot perform version downgrades. "+
"old version: %s, new version: %s",
old.Spec.Version, new.Spec.Version,
),
),
)
}

if new.Spec.Version.Major() != old.Spec.Version.Major() {
allErrs = append(
allErrs,
field.Forbidden(
fldPath.Child("version"),
"cannot perform major version upgrades",
),
)
}

npPath := fldPath.Child("nodePools")
for i, newNp := range new.Spec.NodePools {
idxPath := npPath.Index(i)
Expand Down
16 changes: 12 additions & 4 deletions pkg/apis/navigator/validation/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,23 @@ func TestValidateCassandraClusterUpdate(t *testing.T) {
new: validCassCluster,
},
"downgrade not allowed": {
old: setVersion(validCassCluster, lowerVersion),
new: validCassCluster,
old: validCassCluster,
new: setVersion(validCassCluster, lowerVersion),
errorExpected: true,
},
"upgrade not allowed": {
"major upgrade not allowed": {
old: validCassCluster,
new: setVersion(validCassCluster, higherVersion),
new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMajor()),
errorExpected: true,
},
"minor version upgrade": {
old: validCassCluster,
new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()),
},
"patch version upgrade": {
old: validCassCluster,
new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpPatch()),
},
}

for title, persistence := range persistenceErrorCases {
Expand Down
13 changes: 12 additions & 1 deletion pkg/apis/navigator/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) {
*out = *in
if in.Version != nil {
in, out := &in.Version, &out.Version
if *in == nil {
*out = nil
} else {
*out = new(version.Version)
**out = **in
}
}
return
}

Expand Down Expand Up @@ -206,7 +215,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) {
in, out := &in.NodePools, &out.NodePools
*out = make(map[string]CassandraClusterNodePoolStatus, len(*in))
for key, val := range *in {
(*out)[key] = val
newVal := new(CassandraClusterNodePoolStatus)
val.DeepCopyInto(newVal)
(*out)[key] = *newVal
}
}
return
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/cassandra/actions/create_nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/pkg/errors"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
Expand All @@ -27,7 +29,7 @@ func (a *CreateNodePool) Execute(s *controllers.State) error {
return nil
}
if err != nil {
return err
return errors.Wrap(err, "unable to create statefulset")
}
s.Recorder.Eventf(
a.Cluster,
Expand Down
Loading

0 comments on commit 9c64f5f

Please sign in to comment.