Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #153 from wallrj/152-cassandra-pilot-resource
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue.

Create Cassandra Pilot resources

* For every pod in the nodepool, create a corresponding Pilot resource.
 * Delete Pilot resources for which there is no corresponding Pod.
 * Ignore Pods that are not owned by the cluster.
 * Stop and error if we encounter Pilots with expected name, but not owned by the cluster.
    
Fixes: #152

**Release note**:
```release-note
NONE
```
  • Loading branch information
jetstack-ci-bot committed Jan 9, 2018
2 parents f06bde4 + a4c6d7a commit 0e6f38c
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 54 deletions.
16 changes: 16 additions & 0 deletions pkg/controllers/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
appsinformers "github.com/jetstack/navigator/third_party/k8s.io/client-go/informers/externalversions/apps/v1beta1"
Expand All @@ -37,6 +38,8 @@ type CassandraController struct {
cassListerSynced cache.InformerSynced
serviceListerSynced cache.InformerSynced
statefulSetListerSynced cache.InformerSynced
pilotsListerSynced cache.InformerSynced
podsListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
Expand All @@ -47,6 +50,8 @@ func NewCassandra(
cassClusters navigatorinformers.CassandraClusterInformer,
services coreinformers.ServiceInformer,
statefulSets appsinformers.StatefulSetInformer,
pilots navigatorinformers.PilotInformer,
pods coreinformers.PodInformer,
recorder record.EventRecorder,
) *CassandraController {
queue := workqueue.NewNamedRateLimitingQueue(
Expand All @@ -65,6 +70,8 @@ func NewCassandra(
cc.cassListerSynced = cassClusters.Informer().HasSynced
cc.serviceListerSynced = services.Informer().HasSynced
cc.statefulSetListerSynced = statefulSets.Informer().HasSynced
cc.pilotsListerSynced = pilots.Informer().HasSynced
cc.podsListerSynced = pods.Informer().HasSynced
cc.control = NewControl(
serviceseedprovider.NewControl(
kubeClient,
Expand All @@ -81,6 +88,13 @@ func NewCassandra(
statefulSets.Lister(),
recorder,
),
pilot.NewControl(
naviClient,
pilots.Lister(),
pods.Lister(),
statefulSets.Lister(),
recorder,
),
recorder,
)
cc.recorder = recorder
Expand Down Expand Up @@ -185,6 +199,8 @@ func CassandraControllerFromContext(ctx *controllers.Context) *CassandraControll
ctx.SharedInformerFactory.Navigator().V1alpha1().CassandraClusters(),
ctx.KubeSharedInformerFactory.Core().V1().Services(),
ctx.KubeSharedInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.SharedInformerFactory.Navigator().V1alpha1().Pilots(),
ctx.KubeSharedInformerFactory.Core().V1().Pods(),
ctx.Recorder,
)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/controllers/cassandra/cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/golang/glog"
v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -19,6 +20,7 @@ const (
MessageErrorSyncConfigMap = "Error syncing config map: %s"
MessageErrorSyncService = "Error syncing service: %s"
MessageErrorSyncNodePools = "Error syncing node pools: %s"
MessageErrorSyncPilots = "Error syncing pilots: %s"
MessageSuccessSync = "Successfully synced CassandraCluster"
)

Expand All @@ -32,19 +34,22 @@ type defaultCassandraClusterControl struct {
seedProviderServiceControl serviceseedprovider.Interface
cqlServiceControl servicecql.Interface
nodepoolControl nodepool.Interface
pilotControl pilot.Interface
recorder record.EventRecorder
}

func NewControl(
seedProviderServiceControl serviceseedprovider.Interface,
cqlServiceControl servicecql.Interface,
nodepoolControl nodepool.Interface,
pilotControl pilot.Interface,
recorder record.EventRecorder,
) ControlInterface {
return &defaultCassandraClusterControl{
seedProviderServiceControl: seedProviderServiceControl,
cqlServiceControl: cqlServiceControl,
nodepoolControl: nodepoolControl,
pilotControl: pilotControl,
recorder: recorder,
}
}
Expand Down Expand Up @@ -84,6 +89,17 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro
)
return err
}
err = e.pilotControl.Sync(c)
if err != nil {
e.recorder.Eventf(
c,
apiv1.EventTypeWarning,
ErrorSync,
MessageErrorSyncPilots,
err,
)
return err
}
e.recorder.Event(
c,
apiv1.EventTypeNormal,
Expand Down
25 changes: 2 additions & 23 deletions pkg/controllers/cassandra/nodepool/nodepool.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package nodepool

import (
"fmt"

v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
apps "k8s.io/api/apps/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -37,23 +33,6 @@ func NewControl(
}
}

func ownerCheck(
set *apps.StatefulSet,
cluster *v1alpha1.CassandraCluster,
) error {
if !metav1.IsControlledBy(set, cluster) {
ownerRef := metav1.GetControllerOf(set)
return fmt.Errorf(
"Foreign owned StatefulSet: "+
"A StatefulSet with name '%s/%s' already exists, "+
"but it is controlled by '%v', not '%s/%s'.",
set.Namespace, set.Name, ownerRef,
cluster.Namespace, cluster.Name,
)
}
return nil
}

func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets(
cluster *v1alpha1.CassandraCluster,
) error {
Expand All @@ -73,7 +52,7 @@ func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets(
return err
}
for _, set := range existingSets {
err := ownerCheck(set, cluster)
err := util.OwnerCheck(set, cluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +82,7 @@ func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet(
if err != nil {
return err
}
err = ownerCheck(existingSet, cluster)
err = util.OwnerCheck(existingSet, cluster)
if err != nil {
return err
}
Expand Down
182 changes: 182 additions & 0 deletions pkg/controllers/cassandra/pilot/pilot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package pilot

import (
"fmt"
"hash/fnv"
"reflect"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
navigator "github.com/jetstack/navigator/pkg/client/clientset/versioned"
navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
hashutil "github.com/jetstack/navigator/pkg/util/hash"
"k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
)

const (
HashAnnotationKey = "navigator.jetstack.io/cassandra-pilot-hash"
)

type Interface interface {
Sync(*v1alpha1.CassandraCluster) error
}

type pilotControl struct {
naviClient navigator.Interface
pilots navlisters.PilotLister
pods corelisters.PodLister
statefulSets appslisters.StatefulSetLister
recorder record.EventRecorder
}

var _ Interface = &pilotControl{}

func NewControl(
naviClient navigator.Interface,
pilots navlisters.PilotLister,
pods corelisters.PodLister,
statefulSets appslisters.StatefulSetLister,
recorder record.EventRecorder,
) *pilotControl {
return &pilotControl{
naviClient: naviClient,
pilots: pilots,
pods: pods,
statefulSets: statefulSets,
recorder: recorder,
}

}

func (c *pilotControl) clusterPods(cluster *v1alpha1.CassandraCluster) ([]*v1.Pod, error) {
var clusterPods []*v1.Pod
allPods, err := c.pods.Pods(cluster.Namespace).List(labels.Everything())
if err != nil {
return clusterPods, err
}
for _, pod := range allPods {
podControlledByCluster, err := controllers.PodControlledByCluster(
cluster,
pod,
c.statefulSets,
)
if err != nil {
return clusterPods, err
}
if !podControlledByCluster {
continue
}
clusterPods = append(clusterPods, pod)
}
return clusterPods, nil
}

func (c *pilotControl) createOrUpdatePilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error {
desiredPilot := PilotForCluster(cluster, pod)
client := c.naviClient.NavigatorV1alpha1().Pilots(desiredPilot.GetNamespace())
lister := c.pilots.Pilots(desiredPilot.GetNamespace())
existingPilot, err := lister.Get(desiredPilot.GetName())
if k8sErrors.IsNotFound(err) {
_, err = client.Create(desiredPilot)
return err
}
if err != nil {
return err
}
err = util.OwnerCheck(existingPilot, cluster)
if err != nil {
return err
}
existingPilot = existingPilot.DeepCopy()
existingPilot.Status = v1alpha1.PilotStatus{}
desiredPilot = existingPilot.DeepCopy()
desiredPilot = updatePilotForCluster(cluster, pod, desiredPilot)
if !reflect.DeepEqual(desiredPilot, existingPilot) {
_, err = client.Update(desiredPilot)
}
return err
}

func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error {
pods, err := c.clusterPods(cluster)
if err != nil {
return err
}
for _, pod := range pods {
err = c.createOrUpdatePilot(cluster, pod)
if err != nil {
return err
}
}
return err
}

func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error {
err := c.syncPilots(cluster)
if err != nil {
return err
}
// TODO: Housekeeping. Remove pilots that don't have a corresponding pod.
return nil
}

func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot {
pilot := &v1alpha1.Pilot{}
pilot.SetOwnerReferences(
[]metav1.OwnerReference{
util.NewControllerRef(cluster),
},
)
return updatePilotForCluster(cluster, pod, pilot)
}

func updatePilotForCluster(
cluster *v1alpha1.CassandraCluster,
pod *v1.Pod,
pilot *v1alpha1.Pilot,
) *v1alpha1.Pilot {
pilot.SetName(pod.GetName())
pilot.SetNamespace(cluster.GetNamespace())
labels := pilot.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for key, val := range util.ClusterLabels(cluster) {
labels[key] = val
}
pilot.SetLabels(labels)
ComputeHashAndUpdateAnnotation(pilot)
return pilot
}

func ComputeHash(p *v1alpha1.Pilot) uint32 {
hashVar := []interface{}{
p.Spec,
p.ObjectMeta,
p.Labels,
}
hasher := fnv.New32()
hashutil.DeepHashObject(hasher, hashVar)
return hasher.Sum32()
}

func UpdateHashAnnotation(p *v1alpha1.Pilot, hash uint32) {
annotations := p.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[HashAnnotationKey] = fmt.Sprintf("%d", hash)
p.SetAnnotations(annotations)
}

func ComputeHashAndUpdateAnnotation(p *v1alpha1.Pilot) {
hash := ComputeHash(p)
UpdateHashAnnotation(p, hash)
}
Loading

0 comments on commit 0e6f38c

Please sign in to comment.