Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Create Cassandra Pilot resources
Browse files Browse the repository at this point in the history
* For every pod in the nodepool, create a corresponding Pilot resource.
* Delete Pilot resources for which there is no corresponding Pod.
* Ignore Pods that are not owned by the cluster.
* Stop and error if we encounter Pilots with expected name, but not owned by the cluster.

Fixes: #152
  • Loading branch information
wallrj committed Nov 30, 2017
1 parent 6d60b53 commit fd4494e
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 32 deletions.
16 changes: 16 additions & 0 deletions pkg/controllers/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
appsinformers "github.com/jetstack/navigator/third_party/k8s.io/client-go/informers/externalversions/apps/v1beta1"
Expand All @@ -37,6 +38,8 @@ type CassandraController struct {
cassListerSynced cache.InformerSynced
serviceListerSynced cache.InformerSynced
statefulSetListerSynced cache.InformerSynced
pilotsListerSynced cache.InformerSynced
podsListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
Expand All @@ -47,6 +50,8 @@ func NewCassandra(
cassClusters navigatorinformers.CassandraClusterInformer,
services coreinformers.ServiceInformer,
statefulSets appsinformers.StatefulSetInformer,
pilots navigatorinformers.PilotInformer,
pods coreinformers.PodInformer,
recorder record.EventRecorder,
) *CassandraController {
queue := workqueue.NewNamedRateLimitingQueue(
Expand All @@ -65,6 +70,8 @@ func NewCassandra(
cc.cassListerSynced = cassClusters.Informer().HasSynced
cc.serviceListerSynced = services.Informer().HasSynced
cc.statefulSetListerSynced = statefulSets.Informer().HasSynced
cc.pilotsListerSynced = pilots.Informer().HasSynced
cc.podsListerSynced = pods.Informer().HasSynced
cc.control = NewControl(
serviceseedprovider.NewControl(
kubeClient,
Expand All @@ -81,6 +88,13 @@ func NewCassandra(
statefulSets.Lister(),
recorder,
),
pilot.NewControl(
naviClient,
pilots.Lister(),
pods.Lister(),
statefulSets.Lister(),
recorder,
),
recorder,
)
cc.recorder = recorder
Expand Down Expand Up @@ -185,6 +199,8 @@ func CassandraControllerFromContext(ctx *controllers.Context) *CassandraControll
ctx.SharedInformerFactory.Navigator().V1alpha1().CassandraClusters(),
ctx.KubeSharedInformerFactory.Core().V1().Services(),
ctx.KubeSharedInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.SharedInformerFactory.Navigator().V1alpha1().Pilots(),
ctx.KubeSharedInformerFactory.Core().V1().Pods(),
ctx.Recorder,
)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/controllers/cassandra/cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/golang/glog"
v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -19,6 +20,7 @@ const (
MessageErrorSyncConfigMap = "Error syncing config map: %s"
MessageErrorSyncService = "Error syncing service: %s"
MessageErrorSyncNodePools = "Error syncing node pools: %s"
MessageErrorSyncPilots = "Error syncing pilots: %s"
MessageSuccessSync = "Successfully synced CassandraCluster"
)

Expand All @@ -32,19 +34,22 @@ type defaultCassandraClusterControl struct {
seedProviderServiceControl serviceseedprovider.Interface
cqlServiceControl servicecql.Interface
nodepoolControl nodepool.Interface
pilotControl pilot.Interface
recorder record.EventRecorder
}

func NewControl(
seedProviderServiceControl serviceseedprovider.Interface,
cqlServiceControl servicecql.Interface,
nodepoolControl nodepool.Interface,
pilotControl pilot.Interface,
recorder record.EventRecorder,
) ControlInterface {
return &defaultCassandraClusterControl{
seedProviderServiceControl: seedProviderServiceControl,
cqlServiceControl: cqlServiceControl,
nodepoolControl: nodepoolControl,
pilotControl: pilotControl,
recorder: recorder,
}
}
Expand Down Expand Up @@ -84,6 +89,17 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro
)
return err
}
err = e.pilotControl.Sync(c)
if err != nil {
e.recorder.Eventf(
c,
apiv1.EventTypeWarning,
ErrorSync,
MessageErrorSyncPilots,
err,
)
return err
}
e.recorder.Event(
c,
apiv1.EventTypeNormal,
Expand Down
25 changes: 2 additions & 23 deletions pkg/controllers/cassandra/nodepool/nodepool.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package nodepool

import (
"fmt"

v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
apps "k8s.io/api/apps/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -37,23 +33,6 @@ func NewControl(
}
}

func ownerCheck(
set *apps.StatefulSet,
cluster *v1alpha1.CassandraCluster,
) error {
if !metav1.IsControlledBy(set, cluster) {
ownerRef := metav1.GetControllerOf(set)
return fmt.Errorf(
"Foreign owned StatefulSet: "+
"A StatefulSet with name '%s/%s' already exists, "+
"but it is controlled by '%v', not '%s/%s'.",
set.Namespace, set.Name, ownerRef,
cluster.Namespace, cluster.Name,
)
}
return nil
}

func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets(
cluster *v1alpha1.CassandraCluster,
) error {
Expand All @@ -73,7 +52,7 @@ func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets(
return err
}
for _, set := range existingSets {
err := ownerCheck(set, cluster)
err := util.OwnerCheck(set, cluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +82,7 @@ func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet(
if err != nil {
return err
}
err = ownerCheck(existingSet, cluster)
err = util.OwnerCheck(existingSet, cluster)
if err != nil {
return err
}
Expand Down
165 changes: 165 additions & 0 deletions pkg/controllers/cassandra/pilot/pilot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package pilot

import (
"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
navigator "github.com/jetstack/navigator/pkg/client/clientset/versioned"
navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
"k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
)

type Interface interface {
Sync(*v1alpha1.CassandraCluster) error
}

type pilotControl struct {
naviClient navigator.Interface
pilots navlisters.PilotLister
pods corelisters.PodLister
statefulSets appslisters.StatefulSetLister
recorder record.EventRecorder
}

var _ Interface = &pilotControl{}

func NewControl(
naviClient navigator.Interface,
pilots navlisters.PilotLister,
pods corelisters.PodLister,
statefulSets appslisters.StatefulSetLister,
recorder record.EventRecorder,
) *pilotControl {
return &pilotControl{
naviClient: naviClient,
pilots: pilots,
pods: pods,
statefulSets: statefulSets,
recorder: recorder,
}

}

func (c *pilotControl) clusterPods(cluster *v1alpha1.CassandraCluster) ([]*v1.Pod, error) {
var clusterPods []*v1.Pod
allPods, err := c.pods.Pods(cluster.Namespace).List(labels.Everything())
if err != nil {
return clusterPods, err
}
for _, pod := range allPods {
podControlledByCluster, err := controllers.PodControlledByCluster(
cluster,
pod,
c.statefulSets,
)
if err != nil {
return clusterPods, err
}
if !podControlledByCluster {
continue
}
clusterPods = append(clusterPods, pod)
}
return clusterPods, nil
}

func (c *pilotControl) createOrUpdatePilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error {
desiredPilot := PilotForCluster(cluster, pod)
client := c.naviClient.NavigatorV1alpha1().Pilots(desiredPilot.GetNamespace())
lister := c.pilots.Pilots(desiredPilot.GetNamespace())
existingPilot, err := lister.Get(desiredPilot.GetName())
if k8sErrors.IsNotFound(err) {
_, err = client.Create(desiredPilot)
return err
}
if err != nil {
return err
}
err = util.OwnerCheck(existingPilot, cluster)
if err != nil {
return err
}
desiredPilot = existingPilot.DeepCopy()
updatePilotForCluster(cluster, pod, desiredPilot)
_, err = client.Update(desiredPilot)
return err
}

func (c *pilotControl) removeUnusedPilots(
cluster *v1alpha1.CassandraCluster,
) error {
expectedPilotNames := map[string]bool{}
clusterPods, err := c.clusterPods(cluster)
if err != nil {
return err
}
for _, pod := range clusterPods {
expectedPilotNames[pod.Name] = true
}
existingPilots, err := c.pilots.Pilots(cluster.Namespace).List(labels.Everything())
if err != nil {
return err
}
client := c.naviClient.NavigatorV1alpha1().Pilots(cluster.Namespace)
for _, pilot := range existingPilots {
err := util.OwnerCheck(pilot, cluster)
if err != nil {
return err
}
_, found := expectedPilotNames[pilot.Name]
if !found {
err := client.Delete(pilot.Name, nil)
if err != nil {
return err
}
}
}
return nil
}

func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error {
pods, err := c.clusterPods(cluster)
if err != nil {
return err
}
for _, pod := range pods {
err = c.createOrUpdatePilot(cluster, pod)
if err != nil {
return err
}
}
return err
}

func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error {
err := c.syncPilots(cluster)
if err != nil {
return err
}
err = c.removeUnusedPilots(cluster)
return err
}

func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot {
pilot := &v1alpha1.Pilot{}
ownerRefs := pilot.GetOwnerReferences()
ownerRefs = append(ownerRefs, util.NewControllerRef(cluster))
pilot.SetOwnerReferences(ownerRefs)
return updatePilotForCluster(cluster, pod, pilot)
}

func updatePilotForCluster(
cluster *v1alpha1.CassandraCluster,
pod *v1.Pod,
pilot *v1alpha1.Pilot,
) *v1alpha1.Pilot {
pilot.SetName(pod.GetName())
pilot.SetNamespace(cluster.GetNamespace())
pilot.SetLabels(util.ClusterLabels(cluster))
return pilot
}
Loading

0 comments on commit fd4494e

Please sign in to comment.