Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

Commit

Permalink
Merge pull request #556 from zcahana/shared_informers
Browse files Browse the repository at this point in the history
ProvisionController support for SharedInformers
  • Loading branch information
childsb authored Feb 9, 2018
2 parents e4cff42 + d790a5d commit 9ff8c10
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 33 deletions.
151 changes: 118 additions & 33 deletions lib/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ type ProvisionController struct {

claimSource cache.ListerWatcher
claimController cache.Controller
claimInformer cache.SharedInformer
volumeSource cache.ListerWatcher
volumeController cache.Controller
volumeInformer cache.SharedInformer
classSource cache.ListerWatcher
classReflector *cache.Reflector
classController cache.Controller
classInformer cache.SharedInformer

volumes cache.Store
claims cache.Store
Expand Down Expand Up @@ -275,7 +278,46 @@ func TermLimit(termLimit time.Duration) func(*ProvisionController) error {
}
}

// NewProvisionController creates a new provision controller
// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims.
// Defaults to using a private (non-shared) informer.
func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.claimInformer = informer
return nil
}
}

// VolumesInformer sets the informer to use for accessing PersistentVolumes.
// Defaults to using a private (non-shared) informer.
func VolumesInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.volumeInformer = informer
return nil
}
}

// ClassesInformer sets the informer to use for accessing StorageClasses.
// The informer must use the versioned resource appropriate for the Kubernetes cluster version
// (that is, v1.StorageClass for >= 1.6, and v1beta1.StorageClass for < 1.6).
// Defaults to using a private (non-shared) informer.
func ClassesInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.classInformer = informer
return nil
}
}

// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
client kubernetes.Interface,
provisionerName string,
Expand Down Expand Up @@ -328,6 +370,9 @@ func NewProvisionController(
option(controller)
}

// ----------------------
// PersistentVolumeClaims

controller.claimSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
Expand All @@ -336,16 +381,30 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
},
}
controller.claims, controller.claimController = cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
},
)

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
}

if controller.claimInformer != nil {
controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
controller.claims, controller.claimController =
controller.claimInformer.GetStore(),
controller.claimInformer.GetController()
} else {
controller.claims, controller.claimController =
cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
claimHandler,
)
}

// -----------------
// PersistentVolumes

controller.volumeSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand All @@ -355,19 +414,34 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumes().Watch(options)
},
}
controller.volumes, controller.volumeController = cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
},
)

controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
}

if controller.volumeInformer != nil {
controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
controller.volumes, controller.volumeController =
controller.volumeInformer.GetStore(),
controller.volumeInformer.GetController()
} else {
controller.volumes, controller.volumeController =
cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
volumeHandler,
)
}

// --------------
// StorageClasses

var versionedClassType runtime.Object
if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
versionedClassType = &storage.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1().StorageClasses().List(options)
Expand All @@ -376,13 +450,8 @@ func NewProvisionController(
return client.StorageV1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
controller.classSource,
&storage.StorageClass{},
controller.classes,
controller.resyncPeriod,
)
} else {
versionedClassType = &storagebeta.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1beta1().StorageClasses().List(options)
Expand All @@ -391,11 +460,27 @@ func NewProvisionController(
return client.StorageV1beta1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
}

classHandler := cache.ResourceEventHandlerFuncs{
// We don't need an actual event handler for StorageClasses,
// but we must pass a non-nil one to cache.NewInformer()
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
}

if controller.classInformer != nil {
// no resource event handler needed for StorageClasses
controller.classes, controller.classController =
controller.classInformer.GetStore(),
controller.classInformer.GetController()
} else {
controller.classes, controller.classController = cache.NewInformer(
controller.classSource,
&storagebeta.StorageClass{},
controller.classes,
versionedClassType,
controller.resyncPeriod,
classHandler,
)
}

Expand All @@ -410,7 +495,7 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
ctrl.hasRunLock.Unlock()
go ctrl.claimController.Run(stopCh)
go ctrl.volumeController.Run(stopCh)
go ctrl.classReflector.Run(stopCh)
go ctrl.classController.Run(stopCh)
<-stopCh
}

Expand Down
111 changes: 111 additions & 0 deletions lib/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
fakev1core "k8s.io/client-go/kubernetes/typed/core/v1/fake"
testclient "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
fcache "k8s.io/client-go/tools/cache/testing"
ref "k8s.io/client-go/tools/reference"
utilversion "k8s.io/kubernetes/pkg/util/version"
)

const (
resyncPeriod = 100 * time.Millisecond
sharedResyncPeriod = 1 * time.Second
defaultServerVersion = "v1.5.0"
)

Expand Down Expand Up @@ -510,6 +514,76 @@ func TestIsOnlyRecordUpdate(t *testing.T) {
}
}

func TestControllerSharedInformers(t *testing.T) {
tests := []struct {
name string
objs []runtime.Object
provisionerName string
expectedVolumes []v1.PersistentVolume
serverVersion string
}{
{
name: "provision for claim-1 with v1beta1 storage class",
objs: []runtime.Object{
newStorageClass("class-1", "foo.bar/baz"),
newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil),
},
provisionerName: "foo.bar/baz",
serverVersion: "v1.5.0",
expectedVolumes: []v1.PersistentVolume{
*newProvisionedVolume(newStorageClass("class-1", "foo.bar/baz"), newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil)),
},
},
{
name: "provision for claim-1 with v1 storage class",
objs: []runtime.Object{
newStorageClassWithSpecifiedReclaimPolicy("class-1", "foo.bar/baz", v1.PersistentVolumeReclaimDelete),
newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil),
},
provisionerName: "foo.bar/baz",
serverVersion: "v1.8.0",
expectedVolumes: []v1.PersistentVolume{
*newProvisionedVolumeWithSpecifiedReclaimPolicy(newStorageClassWithSpecifiedReclaimPolicy("class-1", "foo.bar/baz", v1.PersistentVolumeReclaimDelete), newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil)),
},
},
{
name: "delete volume-1",
objs: []runtime.Object{
newVolume("volume-1", v1.VolumeReleased, v1.PersistentVolumeReclaimDelete, map[string]string{annDynamicallyProvisioned: "foo.bar/baz"}),
},
provisionerName: "foo.bar/baz",
expectedVolumes: []v1.PersistentVolume{},
},
}

for _, test := range tests {
client := fake.NewSimpleClientset(test.objs...)

serverVersion := defaultServerVersion
if test.serverVersion != "" {
serverVersion = test.serverVersion
}
ctrl, informersFactory := newTestProvisionControllerSharedInformers(client, test.provisionerName,
newTestProvisioner(), serverVersion, sharedResyncPeriod)
stopCh := make(chan struct{})

go ctrl.Run(stopCh)
go informersFactory.Start(stopCh)

informersFactory.WaitForCacheSync(stopCh)
time.Sleep(2 * sharedResyncPeriod)
ctrl.runningOperations.Wait()

pvList, _ := client.Core().PersistentVolumes().List(metav1.ListOptions{})
if (len(test.expectedVolumes) > 0 || len(pvList.Items) > 0) &&
!reflect.DeepEqual(test.expectedVolumes, pvList.Items) {
t.Logf("test case: %s", test.name)
t.Errorf("expected PVs:\n %v\n but got:\n %v\n", test.expectedVolumes, pvList.Items)
}
close(stopCh)
}
}

func newTestProvisionController(
client kubernetes.Interface,
provisionerName string,
Expand All @@ -531,6 +605,43 @@ func newTestProvisionController(
return ctrl
}

func newTestProvisionControllerSharedInformers(
client kubernetes.Interface,
provisionerName string,
provisioner Provisioner,
serverGitVersion string,
resyncPeriod time.Duration,
) (*ProvisionController, informers.SharedInformerFactory) {

informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
volumeInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
classInformer := func() cache.SharedIndexInformer {
if utilversion.MustParseSemantic(serverGitVersion).AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
return informerFactory.Storage().V1().StorageClasses().Informer()
}
return informerFactory.Storage().V1beta1().StorageClasses().Informer()
}()

ctrl := NewProvisionController(
client,
provisionerName,
provisioner,
serverGitVersion,
ResyncPeriod(resyncPeriod),
ExponentialBackOffOnError(false),
CreateProvisionedPVInterval(10*time.Millisecond),
LeaseDuration(2*resyncPeriod),
RenewDeadline(resyncPeriod),
RetryPeriod(resyncPeriod/2),
TermLimit(2*resyncPeriod),
ClaimsInformer(claimInformer),
VolumesInformer(volumeInformer),
ClassesInformer(classInformer))

return ctrl, informerFactory
}

func newStorageClass(name, provisioner string) *storagebeta.StorageClass {
defaultReclaimPolicy := v1.PersistentVolumeReclaimDelete

Expand Down

0 comments on commit 9ff8c10

Please sign in to comment.