diff --git a/lib/controller/controller.go b/lib/controller/controller.go index 28799fdeccd..36924dda2cd 100644 --- a/lib/controller/controller.go +++ b/lib/controller/controller.go @@ -90,10 +90,13 @@ type ProvisionController struct { claimSource cache.ListerWatcher claimController cache.Controller + claimInformer cache.SharedInformer volumeSource cache.ListerWatcher volumeController cache.Controller + volumeInformer cache.SharedInformer classSource cache.ListerWatcher - classReflector *cache.Reflector + classController cache.Controller + classInformer cache.SharedInformer volumes cache.Store claims cache.Store @@ -275,7 +278,46 @@ func TermLimit(termLimit time.Duration) func(*ProvisionController) error { } } -// NewProvisionController creates a new provision controller +// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims. +// Defaults to using a private (non-shared) informer. +func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + c.claimInformer = informer + return nil + } +} + +// VolumesInformer sets the informer to use for accessing PersistentVolumes. +// Defaults to using a private (non-shared) informer. +func VolumesInformer(informer cache.SharedInformer) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + c.volumeInformer = informer + return nil + } +} + +// ClassesInformer sets the informer to use for accessing StorageClasses. +// The informer must use the versioned resource appropriate for the Kubernetes cluster version +// (that is, v1.StorageClass for >= 1.6, and v1beta1.StorageClass for < 1.6). +// Defaults to using a private (non-shared) informer. +func ClassesInformer(informer cache.SharedInformer) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + c.classInformer = informer + return nil + } +} + +// NewProvisionController creates a new provision controller using +// the given configuration parameters and with private (non-shared) informers. func NewProvisionController( client kubernetes.Interface, provisionerName string, @@ -328,6 +370,9 @@ func NewProvisionController( option(controller) } + // ---------------------- + // PersistentVolumeClaims + controller.claimSource = &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options) @@ -336,16 +381,30 @@ func NewProvisionController( return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options) }, } - controller.claims, controller.claimController = cache.NewInformer( - controller.claimSource, - &v1.PersistentVolumeClaim{}, - controller.resyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: controller.addClaim, - UpdateFunc: controller.updateClaim, - DeleteFunc: nil, - }, - ) + + claimHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: controller.addClaim, + UpdateFunc: controller.updateClaim, + DeleteFunc: nil, + } + + if controller.claimInformer != nil { + controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod) + controller.claims, controller.claimController = + controller.claimInformer.GetStore(), + controller.claimInformer.GetController() + } else { + controller.claims, controller.claimController = + cache.NewInformer( + controller.claimSource, + &v1.PersistentVolumeClaim{}, + controller.resyncPeriod, + claimHandler, + ) + } + + // ----------------- + // PersistentVolumes controller.volumeSource = &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -355,19 +414,34 @@ func NewProvisionController( return client.CoreV1().PersistentVolumes().Watch(options) }, } - controller.volumes, controller.volumeController = cache.NewInformer( - controller.volumeSource, - &v1.PersistentVolume{}, - controller.resyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: nil, - UpdateFunc: controller.updateVolume, - DeleteFunc: nil, - }, - ) - controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + volumeHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: nil, + UpdateFunc: controller.updateVolume, + DeleteFunc: nil, + } + + if controller.volumeInformer != nil { + controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod) + controller.volumes, controller.volumeController = + controller.volumeInformer.GetStore(), + controller.volumeInformer.GetController() + } else { + controller.volumes, controller.volumeController = + cache.NewInformer( + controller.volumeSource, + &v1.PersistentVolume{}, + controller.resyncPeriod, + volumeHandler, + ) + } + + // -------------- + // StorageClasses + + var versionedClassType runtime.Object if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) { + versionedClassType = &storage.StorageClass{} controller.classSource = &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.StorageV1().StorageClasses().List(options) @@ -376,13 +450,8 @@ func NewProvisionController( return client.StorageV1().StorageClasses().Watch(options) }, } - controller.classReflector = cache.NewReflector( - controller.classSource, - &storage.StorageClass{}, - controller.classes, - controller.resyncPeriod, - ) } else { + versionedClassType = &storagebeta.StorageClass{} controller.classSource = &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.StorageV1beta1().StorageClasses().List(options) @@ -391,11 +460,27 @@ func NewProvisionController( return client.StorageV1beta1().StorageClasses().Watch(options) }, } - controller.classReflector = cache.NewReflector( + } + + classHandler := cache.ResourceEventHandlerFuncs{ + // We don't need an actual event handler for StorageClasses, + // but we must pass a non-nil one to cache.NewInformer() + AddFunc: nil, + UpdateFunc: nil, + DeleteFunc: nil, + } + + if controller.classInformer != nil { + // no resource event handler needed for StorageClasses + controller.classes, controller.classController = + controller.classInformer.GetStore(), + controller.classInformer.GetController() + } else { + controller.classes, controller.classController = cache.NewInformer( controller.classSource, - &storagebeta.StorageClass{}, - controller.classes, + versionedClassType, controller.resyncPeriod, + classHandler, ) } @@ -410,7 +495,7 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) { ctrl.hasRunLock.Unlock() go ctrl.claimController.Run(stopCh) go ctrl.volumeController.Run(stopCh) - go ctrl.classReflector.Run(stopCh) + go ctrl.classController.Run(stopCh) <-stopCh } diff --git a/lib/controller/controller_test.go b/lib/controller/controller_test.go index b70ab811e19..3907da1fa85 100644 --- a/lib/controller/controller_test.go +++ b/lib/controller/controller_test.go @@ -34,17 +34,21 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" fakev1core "k8s.io/client-go/kubernetes/typed/core/v1/fake" testclient "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" fcache "k8s.io/client-go/tools/cache/testing" ref "k8s.io/client-go/tools/reference" + utilversion "k8s.io/kubernetes/pkg/util/version" ) const ( resyncPeriod = 100 * time.Millisecond + sharedResyncPeriod = 1 * time.Second defaultServerVersion = "v1.5.0" ) @@ -510,6 +514,76 @@ func TestIsOnlyRecordUpdate(t *testing.T) { } } +func TestControllerSharedInformers(t *testing.T) { + tests := []struct { + name string + objs []runtime.Object + provisionerName string + expectedVolumes []v1.PersistentVolume + serverVersion string + }{ + { + name: "provision for claim-1 with v1beta1 storage class", + objs: []runtime.Object{ + newStorageClass("class-1", "foo.bar/baz"), + newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil), + }, + provisionerName: "foo.bar/baz", + serverVersion: "v1.5.0", + expectedVolumes: []v1.PersistentVolume{ + *newProvisionedVolume(newStorageClass("class-1", "foo.bar/baz"), newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil)), + }, + }, + { + name: "provision for claim-1 with v1 storage class", + objs: []runtime.Object{ + newStorageClassWithSpecifiedReclaimPolicy("class-1", "foo.bar/baz", v1.PersistentVolumeReclaimDelete), + newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil), + }, + provisionerName: "foo.bar/baz", + serverVersion: "v1.8.0", + expectedVolumes: []v1.PersistentVolume{ + *newProvisionedVolumeWithSpecifiedReclaimPolicy(newStorageClassWithSpecifiedReclaimPolicy("class-1", "foo.bar/baz", v1.PersistentVolumeReclaimDelete), newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil)), + }, + }, + { + name: "delete volume-1", + objs: []runtime.Object{ + newVolume("volume-1", v1.VolumeReleased, v1.PersistentVolumeReclaimDelete, map[string]string{annDynamicallyProvisioned: "foo.bar/baz"}), + }, + provisionerName: "foo.bar/baz", + expectedVolumes: []v1.PersistentVolume{}, + }, + } + + for _, test := range tests { + client := fake.NewSimpleClientset(test.objs...) + + serverVersion := defaultServerVersion + if test.serverVersion != "" { + serverVersion = test.serverVersion + } + ctrl, informersFactory := newTestProvisionControllerSharedInformers(client, test.provisionerName, + newTestProvisioner(), serverVersion, sharedResyncPeriod) + stopCh := make(chan struct{}) + + go ctrl.Run(stopCh) + go informersFactory.Start(stopCh) + + informersFactory.WaitForCacheSync(stopCh) + time.Sleep(2 * sharedResyncPeriod) + ctrl.runningOperations.Wait() + + pvList, _ := client.Core().PersistentVolumes().List(metav1.ListOptions{}) + if (len(test.expectedVolumes) > 0 || len(pvList.Items) > 0) && + !reflect.DeepEqual(test.expectedVolumes, pvList.Items) { + t.Logf("test case: %s", test.name) + t.Errorf("expected PVs:\n %v\n but got:\n %v\n", test.expectedVolumes, pvList.Items) + } + close(stopCh) + } +} + func newTestProvisionController( client kubernetes.Interface, provisionerName string, @@ -531,6 +605,43 @@ func newTestProvisionController( return ctrl } +func newTestProvisionControllerSharedInformers( + client kubernetes.Interface, + provisionerName string, + provisioner Provisioner, + serverGitVersion string, + resyncPeriod time.Duration, +) (*ProvisionController, informers.SharedInformerFactory) { + + informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod) + claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer() + volumeInformer := informerFactory.Core().V1().PersistentVolumes().Informer() + classInformer := func() cache.SharedIndexInformer { + if utilversion.MustParseSemantic(serverGitVersion).AtLeast(utilversion.MustParseSemantic("v1.6.0")) { + return informerFactory.Storage().V1().StorageClasses().Informer() + } + return informerFactory.Storage().V1beta1().StorageClasses().Informer() + }() + + ctrl := NewProvisionController( + client, + provisionerName, + provisioner, + serverGitVersion, + ResyncPeriod(resyncPeriod), + ExponentialBackOffOnError(false), + CreateProvisionedPVInterval(10*time.Millisecond), + LeaseDuration(2*resyncPeriod), + RenewDeadline(resyncPeriod), + RetryPeriod(resyncPeriod/2), + TermLimit(2*resyncPeriod), + ClaimsInformer(claimInformer), + VolumesInformer(volumeInformer), + ClassesInformer(classInformer)) + + return ctrl, informerFactory +} + func newStorageClass(name, provisioner string) *storagebeta.StorageClass { defaultReclaimPolicy := v1.PersistentVolumeReclaimDelete