From 37771a06076e494643c7b111c2b6d183e3093ad1 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Fri, 2 Feb 2018 15:49:49 +0000 Subject: [PATCH] Create reflectors asynchronously Reflectors are created and run within the same function, asynchronously from the main thread. Creating reflectors may require calls to the kubernetes api, which can return errors. API errors are not handled in the main thread, but are handled asynchronously by retries. --- probe/kubernetes/client.go | 100 ++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 4f2af1d012..0545f64acf 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -134,20 +134,17 @@ func NewClient(config ClientConfig) (Client, error) { client: c, } - result.cronJobStore, err = result.setupCronjobStore() - if err != nil { - return nil, err - } - - podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc) - result.podStore = result.setupStore(c.CoreV1().RESTClient(), "pods", &apiv1.Pod{}, podStore) - result.serviceStore = result.setupStore(c.CoreV1().RESTClient(), "services", &apiv1.Service{}, nil) - result.nodeStore = result.setupStore(c.CoreV1().RESTClient(), "nodes", &apiv1.Node{}, nil) - result.namespaceStore = result.setupStore(c.CoreV1().RESTClient(), "namespaces", &apiv1.Namespace{}, nil) - result.deploymentStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil) - result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil) - result.jobStore = result.setupStore(c.BatchV1().RESTClient(), "jobs", &apibatchv1.Job{}, nil) - result.statefulSetStore = result.setupStore(c.AppsV1beta1().RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil) + result.podStore = NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc) + result.runReflectorUntil("pods", result.podStore) + + result.serviceStore = result.setupStore("services") + result.nodeStore = result.setupStore("nodes") + result.namespaceStore = result.setupStore("namespaces") + result.deploymentStore = result.setupStore("deployments") + result.daemonSetStore = result.setupStore("daemonsets") + result.jobStore = result.setupStore("jobs") + result.statefulSetStore = result.setupStore("statefulsets") + result.cronJobStore = result.setupStore("cronjobs") return result, nil } @@ -170,39 +167,56 @@ func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource return false, nil } -func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store { - lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything()) - store := nonDefaultStore - if store == nil { - store = cache.NewStore(cache.MetaNamespaceKeyFunc) - } - c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource) +func (c *client) setupStore(resource string) cache.Store { + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + c.runReflectorUntil(resource, store) return store } -func (c *client) setupCronjobStore() (cache.Store, error) { - const resource = "cronjobs" - ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource) - if err != nil { - return nil, err - } - if ok { - // kubernetes >= 1.8 - return c.setupStore(c.client.BatchV1beta1().RESTClient(), resource, &apibatchv1beta1.CronJob{}, nil), nil +func (c *client) clientAndType(resource string) (rest.Interface, interface{}, error) { + switch resource { + case "pods": + return c.client.CoreV1().RESTClient(), &apiv1.Pod{}, nil + case "services": + return c.client.CoreV1().RESTClient(), &apiv1.Service{}, nil + case "nodes": + return c.client.CoreV1().RESTClient(), &apiv1.Node{}, nil + case "namespaces": + return c.client.CoreV1().RESTClient(), &apiv1.Namespace{}, nil + case "deployments": + return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.Deployment{}, nil + case "daemonsets": + return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.DaemonSet{}, nil + case "jobs": + return c.client.BatchV1().RESTClient(), &apibatchv1.Job{}, nil + case "statefulsets": + return c.client.AppsV1beta1().RESTClient(), &apiappsv1beta1.StatefulSet{}, nil + case "cronjobs": + ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource) + if err != nil { + return nil, nil, err + } + if ok { + // kubernetes >= 1.8 + return c.client.BatchV1beta1().RESTClient(), &apibatchv1beta1.CronJob{}, nil + } + // kubernetes < 1.8 + return c.client.BatchV2alpha1().RESTClient(), &apibatchv2alpha1.CronJob{}, nil } - // kubernetes < 1.8 - return c.setupStore(c.client.BatchV2alpha1().RESTClient(), resource, &apibatchv2alpha1.CronJob{}, nil), nil + return nil, nil, fmt.Errorf("Invalid resource: %v", resource) } // runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes. // Errors are logged and retried with exponential backoff. -func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) { +func (c *client) runReflectorUntil(resource string, store cache.Store) { + var r *cache.Reflector listAndWatch := func() (bool, error) { - select { - case <-c.quit: - return true, nil - default: - ok, err := c.isResourceSupported(groupVersion, resource) + if r == nil { + kclient, itemType, err := c.clientAndType(resource) + if err != nil { + return false, err + } + ok, err := c.isResourceSupported(kclient.APIVersion(), resource) if err != nil { return false, err } @@ -210,7 +224,15 @@ func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.Group log.Infof("%v are not supported by this Kubernetes version", resource) return true, nil } - err = r.ListAndWatch(c.quit) + lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything()) + r = cache.NewReflector(lw, itemType, store, c.resyncPeriod) + } + + select { + case <-c.quit: + return true, nil + default: + err := r.ListAndWatch(c.quit) return false, err } }