Skip to content

Commit

Permalink
Create reflectors asynchronously
Browse files Browse the repository at this point in the history
Reflectors are created and run within the same function, asynchronously from the main thread.
Creating reflectors may require calls to the kubernetes api, which can return errors.
API errors are not handled in the main thread, but are handled asynchronously by retries.
  • Loading branch information
Roberto Bruggemann committed Feb 5, 2018
1 parent 35d97bc commit 37771a0
Showing 1 changed file with 61 additions and 39 deletions.
100 changes: 61 additions & 39 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,17 @@ func NewClient(config ClientConfig) (Client, error) {
client: c,
}

result.cronJobStore, err = result.setupCronjobStore()
if err != nil {
return nil, err
}

podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = result.setupStore(c.CoreV1().RESTClient(), "pods", &apiv1.Pod{}, podStore)
result.serviceStore = result.setupStore(c.CoreV1().RESTClient(), "services", &apiv1.Service{}, nil)
result.nodeStore = result.setupStore(c.CoreV1().RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1().RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
result.jobStore = result.setupStore(c.BatchV1().RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.statefulSetStore = result.setupStore(c.AppsV1beta1().RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
result.podStore = NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.runReflectorUntil("pods", result.podStore)

result.serviceStore = result.setupStore("services")
result.nodeStore = result.setupStore("nodes")
result.namespaceStore = result.setupStore("namespaces")
result.deploymentStore = result.setupStore("deployments")
result.daemonSetStore = result.setupStore("daemonsets")
result.jobStore = result.setupStore("jobs")
result.statefulSetStore = result.setupStore("statefulsets")
result.cronJobStore = result.setupStore("cronjobs")

return result, nil
}
Expand All @@ -170,47 +167,72 @@ func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource
return false, nil
}

func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
store := nonDefaultStore
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource)
func (c *client) setupStore(resource string) cache.Store {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
c.runReflectorUntil(resource, store)
return store
}

func (c *client) setupCronjobStore() (cache.Store, error) {
const resource = "cronjobs"
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
if err != nil {
return nil, err
}
if ok {
// kubernetes >= 1.8
return c.setupStore(c.client.BatchV1beta1().RESTClient(), resource, &apibatchv1beta1.CronJob{}, nil), nil
func (c *client) clientAndType(resource string) (rest.Interface, interface{}, error) {
switch resource {
case "pods":
return c.client.CoreV1().RESTClient(), &apiv1.Pod{}, nil
case "services":
return c.client.CoreV1().RESTClient(), &apiv1.Service{}, nil
case "nodes":
return c.client.CoreV1().RESTClient(), &apiv1.Node{}, nil
case "namespaces":
return c.client.CoreV1().RESTClient(), &apiv1.Namespace{}, nil
case "deployments":
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.Deployment{}, nil
case "daemonsets":
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.DaemonSet{}, nil
case "jobs":
return c.client.BatchV1().RESTClient(), &apibatchv1.Job{}, nil
case "statefulsets":
return c.client.AppsV1beta1().RESTClient(), &apiappsv1beta1.StatefulSet{}, nil
case "cronjobs":
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
if err != nil {
return nil, nil, err
}
if ok {
// kubernetes >= 1.8
return c.client.BatchV1beta1().RESTClient(), &apibatchv1beta1.CronJob{}, nil
}
// kubernetes < 1.8
return c.client.BatchV2alpha1().RESTClient(), &apibatchv2alpha1.CronJob{}, nil
}
// kubernetes < 1.8
return c.setupStore(c.client.BatchV2alpha1().RESTClient(), resource, &apibatchv2alpha1.CronJob{}, nil), nil
return nil, nil, fmt.Errorf("Invalid resource: %v", resource)
}

// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes.
// Errors are logged and retried with exponential backoff.
func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) {
func (c *client) runReflectorUntil(resource string, store cache.Store) {
var r *cache.Reflector
listAndWatch := func() (bool, error) {
select {
case <-c.quit:
return true, nil
default:
ok, err := c.isResourceSupported(groupVersion, resource)
if r == nil {
kclient, itemType, err := c.clientAndType(resource)
if err != nil {
return false, err
}
ok, err := c.isResourceSupported(kclient.APIVersion(), resource)
if err != nil {
return false, err
}
if !ok {
log.Infof("%v are not supported by this Kubernetes version", resource)
return true, nil
}
err = r.ListAndWatch(c.quit)
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
r = cache.NewReflector(lw, itemType, store, c.resyncPeriod)
}

select {
case <-c.quit:
return true, nil
default:
err := r.ListAndWatch(c.quit)
return false, err
}
}
Expand Down

0 comments on commit 37771a0

Please sign in to comment.