Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #292 from wallrj/simpler-pilot-control
Browse files Browse the repository at this point in the history
Use new unit test fixture for pilot control tests
  • Loading branch information
jetstack-bot authored Mar 27, 2018
2 parents 2872a8c + 1571422 commit 13d9bd1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 183 deletions.
8 changes: 8 additions & 0 deletions docs/supplementary-resources.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
Pilots
------

Navigator creates one ``Pilot`` resource for every database node.
``Pilot`` resources have the same name and name space as the ``Pod`` for the corresponding database node.
The ``Pilot.Spec`` is read by the pilot process running inside a ``Pod`` and contains its desired configuration.
The ``Pilot.Status`` is updated by the pilot process and contains the discovered state of a single database node.

Other Supplementary Resources
-----------------------------

Expand Down
85 changes: 14 additions & 71 deletions pkg/controllers/cassandra/pilot/pilot.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package pilot

import (
"fmt"
"hash/fnv"
"reflect"

"k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,7 +14,6 @@ import (
navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
hashutil "github.com/jetstack/navigator/pkg/util/hash"
)

const (
Expand Down Expand Up @@ -79,29 +74,20 @@ func (c *pilotControl) clusterPods(cluster *v1alpha1.CassandraCluster) ([]*v1.Po
return clusterPods, nil
}

func (c *pilotControl) createOrUpdatePilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error {
func (c *pilotControl) createPilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error {
desiredPilot := PilotForCluster(cluster, pod)
client := c.naviClient.NavigatorV1alpha1().Pilots(desiredPilot.GetNamespace())
lister := c.pilots.Pilots(desiredPilot.GetNamespace())
existingPilot, err := lister.Get(desiredPilot.GetName())
if k8sErrors.IsNotFound(err) {
_, err = client.Create(desiredPilot)
return err
// Pilot already exists
if err == nil {
return util.OwnerCheck(existingPilot, cluster)
}
if err != nil {
// The only error we expect is that the pilot does not exist.
if !k8sErrors.IsNotFound(err) {
return err
}
err = util.OwnerCheck(existingPilot, cluster)
if err != nil {
return err
}
existingPilot = existingPilot.DeepCopy()
existingPilot.Status = v1alpha1.PilotStatus{}
desiredPilot = existingPilot.DeepCopy()
desiredPilot = updatePilotForCluster(cluster, pod, desiredPilot)
if !reflect.DeepEqual(desiredPilot, existingPilot) {
_, err = client.Update(desiredPilot)
}
_, err = client.Create(desiredPilot)
return err
}

Expand All @@ -111,7 +97,7 @@ func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error {
return err
}
for _, pod := range pods {
err = c.createOrUpdatePilot(cluster, pod)
err = c.createPilot(cluster, pod)
if err != nil {
return err
}
Expand All @@ -129,55 +115,12 @@ func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error {
}

func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot {
pilot := &v1alpha1.Pilot{}
pilot.SetOwnerReferences(
[]metav1.OwnerReference{
util.NewControllerRef(cluster),
return &v1alpha1.Pilot{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Labels: util.ClusterLabels(cluster),
OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)},
},
)
return updatePilotForCluster(cluster, pod, pilot)
}

func updatePilotForCluster(
cluster *v1alpha1.CassandraCluster,
pod *v1.Pod,
pilot *v1alpha1.Pilot,
) *v1alpha1.Pilot {
pilot.SetName(pod.GetName())
pilot.SetNamespace(cluster.GetNamespace())
labels := pilot.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for key, val := range util.ClusterLabels(cluster) {
labels[key] = val
}
pilot.SetLabels(labels)
ComputeHashAndUpdateAnnotation(pilot)
return pilot
}

func ComputeHash(p *v1alpha1.Pilot) uint32 {
hashVar := []interface{}{
p.Spec,
p.ObjectMeta,
p.Labels,
}
hasher := fnv.New32()
hashutil.DeepHashObject(hasher, hashVar)
return hasher.Sum32()
}

func UpdateHashAnnotation(p *v1alpha1.Pilot, hash uint32) {
annotations := p.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[HashAnnotationKey] = fmt.Sprintf("%d", hash)
p.SetAnnotations(annotations)
}

func ComputeHashAndUpdateAnnotation(p *v1alpha1.Pilot) {
hash := ComputeHash(p)
UpdateHashAnnotation(p, hash)
}
220 changes: 108 additions & 112 deletions pkg/controllers/cassandra/pilot/pilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

"github.com/jetstack/navigator/internal/test/unit/framework"
"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
Expand All @@ -25,121 +28,114 @@ func clusterPod(cluster *v1alpha1.CassandraCluster, name string) *v1.Pod {
return pod
}

func nonClusterPod(cluster *v1alpha1.CassandraCluster, name string) *v1.Pod {
p := clusterPod(cluster, name)
p.SetOwnerReferences([]metav1.OwnerReference{})
return p
}

func TestPilotSync(t *testing.T) {
t.Run(
"each cluster pod gets a pilot",
func(t *testing.T) {
f := casstesting.NewFixture(t)
f.AddObjectK(clusterPod(f.Cluster, "foo"))
f.AddObjectK(clusterPod(f.Cluster, "bar"))
f.Run()
f.AssertPilotsLength(2)
},
)
t.Run(
"non-cluster pods are ignored",
func(t *testing.T) {
f := casstesting.NewFixture(t)
f.AddObjectK(clusterPod(f.Cluster, "foo"))
f.AddObjectK(nonClusterPod(f.Cluster, "bar"))
f.Run()
f.AssertPilotsLength(1)
},
)
t.Run(
"pilot exists",
func(t *testing.T) {
f := casstesting.NewFixture(t)
pod := clusterPod(f.Cluster, "foo")
pilot := pilot.PilotForCluster(f.Cluster, pod)
f.AddObjectK(pod)
f.AddObjectN(pilot)
f.Run()
f.AssertPilotsLength(1)
},
)
t.Run(
"foreign owned pilot",
func(t *testing.T) {
f := casstesting.NewFixture(t)
pod := clusterPod(f.Cluster, "foo")
pilot := pilot.PilotForCluster(f.Cluster, pod)
pilot.SetOwnerReferences([]metav1.OwnerReference{})
f.AddObjectK(pod)
f.AddObjectN(pilot)
f.RunExpectError()
f.AssertPilotsLength(1)
cluster1 := casstesting.ClusterForTest()
cluster1pod1 := clusterPod(cluster1, "c1p1")
cluster1pod2 := clusterPod(cluster1, "c1p2")
cluster1pilot1 := pilot.PilotForCluster(cluster1, cluster1pod1)
cluster1pilot1foreign := cluster1pilot1.DeepCopy()
cluster1pilot1foreign.SetOwnerReferences([]metav1.OwnerReference{})

cluster2 := casstesting.ClusterForTest()
cluster2.SetName("cluster2")
cluster2.SetUID("uid2")
cluster2pod1 := clusterPod(cluster2, "c2p1")

type testT struct {
kubeObjects []runtime.Object
navObjects []runtime.Object
cluster *v1alpha1.CassandraCluster
assertions func(*testing.T, *controllers.State)
expectErr bool
}

tests := map[string]testT{
"each cluster pod gets a pilot": {
kubeObjects: []runtime.Object{
cluster1pod1,
cluster1pod2,
cluster2pod1,
},
cluster: cluster1,
assertions: func(t *testing.T, state *controllers.State) {
pilots, err := state.NavigatorClientset.
Navigator().Pilots(cluster1.Namespace).List(metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
expectedPilotCount := 2
pilotCount := len(pilots.Items)
if pilotCount != expectedPilotCount {
t.Log(pilots.Items)
t.Errorf("Unexpected pilot count: %d != %d", expectedPilotCount, pilotCount)
}
},
},
)
t.Run(
"pilot sync when hash changes",
func(t *testing.T) {
f := casstesting.NewFixture(t)
pod := clusterPod(f.Cluster, "foo")
unsyncedPilot := pilot.PilotForCluster(f.Cluster, pod)
pilot.UpdateHashAnnotation(unsyncedPilot, 0)
f.AddObjectK(pod)
f.AddObjectN(unsyncedPilot)
f.Run()
f.AssertPilotsLength(1)
updatedPilot := f.Pilots().Items[0]
updatedPilotAnnotations := updatedPilot.GetAnnotations()
hash, ok := updatedPilotAnnotations[pilot.HashAnnotationKey]
if !ok {
t.Log(updatedPilotAnnotations)
t.Error("pilot hash annotation not found")
}
if hash == "0" {
t.Log(updatedPilot)
t.Error("Pilot was not updated")
}
"non-cluster pods are ignored": {
kubeObjects: []runtime.Object{
cluster1pod1,
cluster2pod1,
},
cluster: cluster1,
assertions: func(t *testing.T, state *controllers.State) {
pilots, err := state.NavigatorClientset.
Navigator().Pilots(cluster1.Namespace).List(metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
expectedPilotCount := 1
pilotCount := len(pilots.Items)
if pilotCount != expectedPilotCount {
t.Log(pilots.Items)
t.Errorf("Unexpected pilot count: %d != %d", expectedPilotCount, pilotCount)
}
},
},
)
t.Run(
"pilot no sync if hash matches",
func(t *testing.T) {
f := casstesting.NewFixture(t)
pod := clusterPod(f.Cluster, "foo")
// Remove the labels
unsyncedPilot := pilot.PilotForCluster(f.Cluster, pod)
unsyncedPilot.SetLabels(map[string]string{})
pilot.ComputeHashAndUpdateAnnotation(unsyncedPilot)
f.AddObjectK(pod)
f.AddObjectN(unsyncedPilot)
f.Run()
f.AssertPilotsLength(1)
updatedPilot := f.Pilots().Items[0]
updatedLabels := updatedPilot.GetLabels()
if len(updatedLabels) == 0 {
t.Log(updatedPilot)
t.Error("pilot was not updated")
}
"no error if pilot exists": {
kubeObjects: []runtime.Object{cluster1pod1},
navObjects: []runtime.Object{cluster1pilot1},
cluster: cluster1,
},
)
t.Run(
"don't clobber custom labels",
func(t *testing.T) {
f := casstesting.NewFixture(t)
pod := clusterPod(f.Cluster, "foo")
// Remove the labels
unsyncedPilot := pilot.PilotForCluster(f.Cluster, pod)
unsyncedPilot.Labels["foo"] = "bar"
f.AddObjectK(pod)
f.AddObjectN(unsyncedPilot)
f.Run()
f.AssertPilotsLength(1)
updatedPilot := f.Pilots().Items[0]
updatedLabels := updatedPilot.GetLabels()
if updatedLabels["foo"] != "bar" {
t.Log(updatedLabels)
t.Error("custom labels were altered")
}
"error if foreign owned": {
kubeObjects: []runtime.Object{cluster1pod1},
navObjects: []runtime.Object{cluster1pilot1foreign},
cluster: cluster1,
expectErr: true,
},
)
}
for title, test := range tests {
t.Run(
title,
func(t *testing.T) {
fixture := &framework.StateFixture{
T: t,
KubeObjects: test.kubeObjects,
NavigatorObjects: test.navObjects,
}
fixture.Start()
defer fixture.Stop()
state := fixture.State()
c := pilot.NewControl(
state.NavigatorClientset,
state.PilotLister,
state.PodLister,
state.StatefulSetLister,
state.Recorder,
)
err := c.Sync(test.cluster)
if err != nil {
if !test.expectErr {
t.Errorf("Unexpected error: %s", err)
}
} else {
if test.expectErr {
t.Error("Missing error")
}
}
if test.assertions != nil {
test.assertions(t, state)
}
},
)
}
}

0 comments on commit 13d9bd1

Please sign in to comment.