Skip to content

Commit

Permalink
Use filtered informers scoped to given namespace
Browse files Browse the repository at this point in the history
To make this work we can’t use the custom “kluster” index for the pod informer anymore. We now use the standard PodLister with a label selector instead. This might incour a performance cost but I think its negligible and we only do this in state creating anyway.

Manually tested to work.

Fixes #169
  • Loading branch information
databus23 committed Jan 15, 2018
1 parent d9a7513 commit 77ada1e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 59 deletions.
10 changes: 7 additions & 3 deletions pkg/controller/ground.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
api_v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
listers_v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -49,6 +51,7 @@ type GroundControl struct {
queue workqueue.RateLimitingInterface
klusterInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
podLister listers_v1.PodLister

Logger log.Logger
}
Expand All @@ -65,6 +68,7 @@ func NewGroundController(factories config.Factories, clients config.Clients, rec
queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second)),
klusterInformer: factories.Kubernikus.Kubernikus().V1().Klusters().Informer(),
podInformer: factories.Kubernetes.Core().V1().Pods().Informer(),
podLister: listers_v1.NewPodLister(factories.Kubernetes.Core().V1().Pods().Informer().GetIndexer()),
Logger: logger,
}

Expand Down Expand Up @@ -205,13 +209,13 @@ func (op *GroundControl) handler(key string) error {
"phase", kluster.Status.Phase)
}
case models.KlusterPhaseCreating:
pods, err := op.podInformer.GetIndexer().ByIndex("kluster", kluster.GetName())
pods, err := op.podLister.List(labels.SelectorFromValidatedSet(map[string]string{"release": kluster.GetName()}))
if err != nil {
return err
}
podsReady := 0
for _, obj := range pods {
if kubernetes.IsPodReady(obj.(*api_v1.Pod)) {
for _, pod := range pods {
if kubernetes.IsPodReady(pod) {
podsReady++
}
}
Expand Down
58 changes: 2 additions & 56 deletions pkg/controller/operator.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
package controller

import (
"errors"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
api_v1 "k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
kubernetes_informers "k8s.io/client-go/informers"
kubernetes_clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1"
Expand All @@ -27,9 +20,7 @@ import (
"github.com/sapcc/kubernikus/pkg/client/openstack"
"github.com/sapcc/kubernikus/pkg/controller/config"
"github.com/sapcc/kubernikus/pkg/controller/launch"
kubernikus_clientset "github.com/sapcc/kubernikus/pkg/generated/clientset"
kubernikus_informers "github.com/sapcc/kubernikus/pkg/generated/informers/externalversions"
kubernikus_informers_v1 "github.com/sapcc/kubernikus/pkg/generated/informers/externalversions/kubernikus/v1"
"github.com/sapcc/kubernikus/pkg/version"
)

Expand Down Expand Up @@ -129,9 +120,8 @@ func NewKubernikusOperator(options *KubernikusOperatorOptions, logger log.Logger
return nil, fmt.Errorf("Couldn't create CRD: %s", err)
}

o.Factories.Kubernikus = kubernikus_informers.NewSharedInformerFactory(o.Clients.Kubernikus, DEFAULT_RECONCILIATION)
o.Factories.Kubernetes = kubernetes_informers.NewSharedInformerFactory(o.Clients.Kubernetes, DEFAULT_RECONCILIATION)
o.initializeCustomInformers()
o.Factories.Kubernikus = kubernikus_informers.NewFilteredSharedInformerFactory(o.Clients.Kubernikus, DEFAULT_RECONCILIATION, options.Namespace, nil)
o.Factories.Kubernetes = kubernetes_informers.NewFilteredSharedInformerFactory(o.Clients.Kubernetes, DEFAULT_RECONCILIATION, options.Namespace, nil)

secrets := o.Clients.Kubernetes.Core().Secrets(options.Namespace)
klusters := o.Factories.Kubernikus.Kubernikus().V1().Klusters().Informer()
Expand Down Expand Up @@ -200,47 +190,3 @@ func (o *KubernikusOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
go controller.Run(CONTROLLER_OPTIONS[name], stopCh, wg)
}
}

// MetaLabelReleaseIndexFunc is a default index function that indexes based on an object's release label
func MetaLabelReleaseIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
if release, found := meta.GetLabels()["release"]; found {
return []string{release}, nil
}
return []string{""}, errors.New("object has no release label")
}

func (o *KubernikusOperator) initializeCustomInformers() {
//Manually create shared Kluster informer that only watches the given namespace
o.Factories.Kubernikus.InformerFor(
&v1.Kluster{},
func(client kubernikus_clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return kubernikus_informers_v1.NewKlusterInformer(
client,
o.Config.Kubernikus.Namespace,
resyncPeriod,
cache.Indexers{},
)
},
)

//Manually create shared pod Informer that only watches the given namespace
o.Factories.Kubernetes.InformerFor(&api_v1.Pod{}, func(client kubernetes_clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(opt metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(o.Config.Kubernikus.Namespace).List(opt)
},
WatchFunc: func(opt metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(o.Config.Kubernikus.Namespace).Watch(opt)
},
},
&api_v1.Pod{},
resyncPeriod,
cache.Indexers{"kluster": MetaLabelReleaseIndexFunc},
)
})
}

0 comments on commit 77ada1e

Please sign in to comment.