Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

ProvisionController support for SharedInformers #556

Merged
merged 2 commits into from
Feb 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 118 additions & 33 deletions lib/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ type ProvisionController struct {

claimSource cache.ListerWatcher
claimController cache.Controller
claimInformer cache.SharedInformer
volumeSource cache.ListerWatcher
volumeController cache.Controller
volumeInformer cache.SharedInformer
classSource cache.ListerWatcher
classReflector *cache.Reflector
classController cache.Controller
classInformer cache.SharedInformer

volumes cache.Store
claims cache.Store
Expand Down Expand Up @@ -275,7 +278,46 @@ func TermLimit(termLimit time.Duration) func(*ProvisionController) error {
}
}

// NewProvisionController creates a new provision controller
// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims.
// Defaults to using a private (non-shared) informer.
func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.claimInformer = informer
return nil
}
}

// VolumesInformer sets the informer to use for accessing PersistentVolumes.
// Defaults to using a private (non-shared) informer.
func VolumesInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.volumeInformer = informer
return nil
}
}

// ClassesInformer sets the informer to use for accessing StorageClasses.
// The informer must use the versioned resource appropriate for the Kubernetes cluster version
// (that is, v1.StorageClass for >= 1.6, and v1beta1.StorageClass for < 1.6).
// Defaults to using a private (non-shared) informer.
func ClassesInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.classInformer = informer
return nil
}
}

// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
client kubernetes.Interface,
provisionerName string,
Expand Down Expand Up @@ -328,6 +370,9 @@ func NewProvisionController(
option(controller)
}

// ----------------------
// PersistentVolumeClaims

controller.claimSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
Expand All @@ -336,16 +381,30 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
},
}
controller.claims, controller.claimController = cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
},
)

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
}

if controller.claimInformer != nil {
controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
controller.claims, controller.claimController =
controller.claimInformer.GetStore(),
controller.claimInformer.GetController()
} else {
controller.claims, controller.claimController =
cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
claimHandler,
)
}

// -----------------
// PersistentVolumes

controller.volumeSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand All @@ -355,19 +414,34 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumes().Watch(options)
},
}
controller.volumes, controller.volumeController = cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
},
)

controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
}

if controller.volumeInformer != nil {
controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
controller.volumes, controller.volumeController =
controller.volumeInformer.GetStore(),
controller.volumeInformer.GetController()
} else {
controller.volumes, controller.volumeController =
cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
volumeHandler,
)
}

// --------------
// StorageClasses

var versionedClassType runtime.Object
if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
versionedClassType = &storage.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1().StorageClasses().List(options)
Expand All @@ -376,13 +450,8 @@ func NewProvisionController(
return client.StorageV1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
controller.classSource,
&storage.StorageClass{},
controller.classes,
controller.resyncPeriod,
)
} else {
versionedClassType = &storagebeta.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1beta1().StorageClasses().List(options)
Expand All @@ -391,11 +460,27 @@ func NewProvisionController(
return client.StorageV1beta1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
}

classHandler := cache.ResourceEventHandlerFuncs{
// We don't need an actual event handler for StorageClasses,
// but we must pass a non-nil one to cache.NewInformer()
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
}

if controller.classInformer != nil {
// no resource event handler needed for StorageClasses
controller.classes, controller.classController =
controller.classInformer.GetStore(),
controller.classInformer.GetController()
} else {
controller.classes, controller.classController = cache.NewInformer(
controller.classSource,
&storagebeta.StorageClass{},
controller.classes,
versionedClassType,
controller.resyncPeriod,
classHandler,
)
}

Expand All @@ -410,7 +495,7 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
ctrl.hasRunLock.Unlock()
go ctrl.claimController.Run(stopCh)
go ctrl.volumeController.Run(stopCh)
go ctrl.classReflector.Run(stopCh)
go ctrl.classController.Run(stopCh)
<-stopCh
}

Expand Down
111 changes: 111 additions & 0 deletions lib/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
fakev1core "k8s.io/client-go/kubernetes/typed/core/v1/fake"
testclient "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
fcache "k8s.io/client-go/tools/cache/testing"
ref "k8s.io/client-go/tools/reference"
utilversion "k8s.io/kubernetes/pkg/util/version"
)

const (
resyncPeriod = 100 * time.Millisecond
sharedResyncPeriod = 1 * time.Second
defaultServerVersion = "v1.5.0"
)

Expand Down Expand Up @@ -510,6 +514,76 @@ func TestIsOnlyRecordUpdate(t *testing.T) {
}
}

func TestControllerSharedInformers(t *testing.T) {
tests := []struct {
name string
objs []runtime.Object
provisionerName string
expectedVolumes []v1.PersistentVolume
serverVersion string
}{
{
name: "provision for claim-1 with v1beta1 storage class",
objs: []runtime.Object{
newStorageClass("class-1", "foo.bar/baz"),
newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil),
},
provisionerName: "foo.bar/baz",
serverVersion: "v1.5.0",
expectedVolumes: []v1.PersistentVolume{
*newProvisionedVolume(newStorageClass("class-1", "foo.bar/baz"), newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil)),
},
},
{
name: "provision for claim-1 with v1 storage class",
objs: []runtime.Object{
newStorageClassWithSpecifiedReclaimPolicy("class-1", "foo.bar/baz", v1.PersistentVolumeReclaimDelete),
newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil),
},
provisionerName: "foo.bar/baz",
serverVersion: "v1.8.0",
expectedVolumes: []v1.PersistentVolume{
*newProvisionedVolumeWithSpecifiedReclaimPolicy(newStorageClassWithSpecifiedReclaimPolicy("class-1", "foo.bar/baz", v1.PersistentVolumeReclaimDelete), newClaim("claim-1", "uid-1-1", "class-1", "foo.bar/baz", "", nil)),
},
},
{
name: "delete volume-1",
objs: []runtime.Object{
newVolume("volume-1", v1.VolumeReleased, v1.PersistentVolumeReclaimDelete, map[string]string{annDynamicallyProvisioned: "foo.bar/baz"}),
},
provisionerName: "foo.bar/baz",
expectedVolumes: []v1.PersistentVolume{},
},
}

for _, test := range tests {
client := fake.NewSimpleClientset(test.objs...)

serverVersion := defaultServerVersion
if test.serverVersion != "" {
serverVersion = test.serverVersion
}
ctrl, informersFactory := newTestProvisionControllerSharedInformers(client, test.provisionerName,
newTestProvisioner(), serverVersion, sharedResyncPeriod)
stopCh := make(chan struct{})

go ctrl.Run(stopCh)
go informersFactory.Start(stopCh)

informersFactory.WaitForCacheSync(stopCh)
time.Sleep(2 * sharedResyncPeriod)
ctrl.runningOperations.Wait()

pvList, _ := client.Core().PersistentVolumes().List(metav1.ListOptions{})
if (len(test.expectedVolumes) > 0 || len(pvList.Items) > 0) &&
!reflect.DeepEqual(test.expectedVolumes, pvList.Items) {
t.Logf("test case: %s", test.name)
t.Errorf("expected PVs:\n %v\n but got:\n %v\n", test.expectedVolumes, pvList.Items)
}
close(stopCh)
}
}

func newTestProvisionController(
client kubernetes.Interface,
provisionerName string,
Expand All @@ -531,6 +605,43 @@ func newTestProvisionController(
return ctrl
}

func newTestProvisionControllerSharedInformers(
client kubernetes.Interface,
provisionerName string,
provisioner Provisioner,
serverGitVersion string,
resyncPeriod time.Duration,
) (*ProvisionController, informers.SharedInformerFactory) {

informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
volumeInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
classInformer := func() cache.SharedIndexInformer {
if utilversion.MustParseSemantic(serverGitVersion).AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
return informerFactory.Storage().V1().StorageClasses().Informer()
}
return informerFactory.Storage().V1beta1().StorageClasses().Informer()
}()

ctrl := NewProvisionController(
client,
provisionerName,
provisioner,
serverGitVersion,
ResyncPeriod(resyncPeriod),
ExponentialBackOffOnError(false),
CreateProvisionedPVInterval(10*time.Millisecond),
LeaseDuration(2*resyncPeriod),
RenewDeadline(resyncPeriod),
RetryPeriod(resyncPeriod/2),
TermLimit(2*resyncPeriod),
ClaimsInformer(claimInformer),
VolumesInformer(volumeInformer),
ClassesInformer(classInformer))

return ctrl, informerFactory
}

func newStorageClass(name, provisioner string) *storagebeta.StorageClass {
defaultReclaimPolicy := v1.PersistentVolumeReclaimDelete

Expand Down