Skip to content

Commit

Permalink
Merge pull request #3050 from weaveworks/async-reflectors
Browse files Browse the repository at this point in the history
Create reflectors asynchronously
  • Loading branch information
rbruggem authored Feb 5, 2018
2 parents 35d97bc + 37771a0 commit ca95703
Showing 1 changed file with 61 additions and 39 deletions.
100 changes: 61 additions & 39 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,17 @@ func NewClient(config ClientConfig) (Client, error) {
client: c,
}

result.cronJobStore, err = result.setupCronjobStore()
if err != nil {
return nil, err
}

podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = result.setupStore(c.CoreV1().RESTClient(), "pods", &apiv1.Pod{}, podStore)
result.serviceStore = result.setupStore(c.CoreV1().RESTClient(), "services", &apiv1.Service{}, nil)
result.nodeStore = result.setupStore(c.CoreV1().RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1().RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
result.jobStore = result.setupStore(c.BatchV1().RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.statefulSetStore = result.setupStore(c.AppsV1beta1().RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
result.podStore = NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.runReflectorUntil("pods", result.podStore)

result.serviceStore = result.setupStore("services")
result.nodeStore = result.setupStore("nodes")
result.namespaceStore = result.setupStore("namespaces")
result.deploymentStore = result.setupStore("deployments")
result.daemonSetStore = result.setupStore("daemonsets")
result.jobStore = result.setupStore("jobs")
result.statefulSetStore = result.setupStore("statefulsets")
result.cronJobStore = result.setupStore("cronjobs")

return result, nil
}
Expand All @@ -170,47 +167,72 @@ func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource
return false, nil
}

func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
store := nonDefaultStore
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource)
func (c *client) setupStore(resource string) cache.Store {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
c.runReflectorUntil(resource, store)
return store
}

func (c *client) setupCronjobStore() (cache.Store, error) {
const resource = "cronjobs"
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
if err != nil {
return nil, err
}
if ok {
// kubernetes >= 1.8
return c.setupStore(c.client.BatchV1beta1().RESTClient(), resource, &apibatchv1beta1.CronJob{}, nil), nil
func (c *client) clientAndType(resource string) (rest.Interface, interface{}, error) {
switch resource {
case "pods":
return c.client.CoreV1().RESTClient(), &apiv1.Pod{}, nil
case "services":
return c.client.CoreV1().RESTClient(), &apiv1.Service{}, nil
case "nodes":
return c.client.CoreV1().RESTClient(), &apiv1.Node{}, nil
case "namespaces":
return c.client.CoreV1().RESTClient(), &apiv1.Namespace{}, nil
case "deployments":
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.Deployment{}, nil
case "daemonsets":
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.DaemonSet{}, nil
case "jobs":
return c.client.BatchV1().RESTClient(), &apibatchv1.Job{}, nil
case "statefulsets":
return c.client.AppsV1beta1().RESTClient(), &apiappsv1beta1.StatefulSet{}, nil
case "cronjobs":
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
if err != nil {
return nil, nil, err
}
if ok {
// kubernetes >= 1.8
return c.client.BatchV1beta1().RESTClient(), &apibatchv1beta1.CronJob{}, nil
}
// kubernetes < 1.8
return c.client.BatchV2alpha1().RESTClient(), &apibatchv2alpha1.CronJob{}, nil
}
// kubernetes < 1.8
return c.setupStore(c.client.BatchV2alpha1().RESTClient(), resource, &apibatchv2alpha1.CronJob{}, nil), nil
return nil, nil, fmt.Errorf("Invalid resource: %v", resource)
}

// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes.
// Errors are logged and retried with exponential backoff.
func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) {
func (c *client) runReflectorUntil(resource string, store cache.Store) {
var r *cache.Reflector
listAndWatch := func() (bool, error) {
select {
case <-c.quit:
return true, nil
default:
ok, err := c.isResourceSupported(groupVersion, resource)
if r == nil {
kclient, itemType, err := c.clientAndType(resource)
if err != nil {
return false, err
}
ok, err := c.isResourceSupported(kclient.APIVersion(), resource)
if err != nil {
return false, err
}
if !ok {
log.Infof("%v are not supported by this Kubernetes version", resource)
return true, nil
}
err = r.ListAndWatch(c.quit)
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
r = cache.NewReflector(lw, itemType, store, c.resyncPeriod)
}

select {
case <-c.quit:
return true, nil
default:
err := r.ListAndWatch(c.quit)
return false, err
}
}
Expand Down

0 comments on commit ca95703

Please sign in to comment.