Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

Commit

Permalink
ProvisionController support for shared informers
Browse files Browse the repository at this point in the history
  • Loading branch information
zcahana committed Jan 21, 2018
1 parent 2572be1 commit 99c887f
Showing 1 changed file with 118 additions and 33 deletions.
151 changes: 118 additions & 33 deletions lib/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ type ProvisionController struct {

claimSource cache.ListerWatcher
claimController cache.Controller
claimInformer cache.SharedInformer
volumeSource cache.ListerWatcher
volumeController cache.Controller
volumeInformer cache.SharedInformer
classSource cache.ListerWatcher
classReflector *cache.Reflector
classController cache.Controller
classInformer cache.SharedInformer

volumes cache.Store
claims cache.Store
Expand Down Expand Up @@ -275,7 +278,46 @@ func TermLimit(termLimit time.Duration) func(*ProvisionController) error {
}
}

// NewProvisionController creates a new provision controller
// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims.
// Defaults to using a private (non-shared) informer.
func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.claimInformer = informer
return nil
}
}

// VolumesInformer sets the informer to use for accessing PersistentVolumes.
// Defaults to using a private (non-shared) informer.
func VolumesInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.volumeInformer = informer
return nil
}
}

// ClassesInformer sets the informer to use for accessing StorageClasses.
// The informer must use the versioned resource appropriate for the Kubernetes cluster version
// (that is, v1.StorageClass for >= 1.6, and v1beta1.StorageClass for < 1.6).
// Defaults to using a private (non-shared) informer.
func ClassesInformer(informer cache.SharedInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.classInformer = informer
return nil
}
}

// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
client kubernetes.Interface,
provisionerName string,
Expand Down Expand Up @@ -328,6 +370,9 @@ func NewProvisionController(
option(controller)
}

// ----------------------
// PersistentVolumeClaims

controller.claimSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
Expand All @@ -336,16 +381,30 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
},
}
controller.claims, controller.claimController = cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
},
)

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
}

if controller.claimInformer != nil {
controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
controller.claims, controller.claimController =
controller.claimInformer.GetStore(),
controller.claimInformer.GetController()
} else {
controller.claims, controller.claimController =
cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
claimHandler,
)
}

// -----------------
// PersistentVolumes

controller.volumeSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand All @@ -355,19 +414,34 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumes().Watch(options)
},
}
controller.volumes, controller.volumeController = cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
},
)

controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
}

if controller.volumeInformer != nil {
controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
controller.volumes, controller.volumeController =
controller.volumeInformer.GetStore(),
controller.volumeInformer.GetController()
} else {
controller.volumes, controller.volumeController =
cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
volumeHandler,
)
}

// --------------
// StorageClasses

var versionedClassType runtime.Object
if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
versionedClassType = &storage.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1().StorageClasses().List(options)
Expand All @@ -376,13 +450,8 @@ func NewProvisionController(
return client.StorageV1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
controller.classSource,
&storage.StorageClass{},
controller.classes,
controller.resyncPeriod,
)
} else {
versionedClassType = &storagebeta.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1beta1().StorageClasses().List(options)
Expand All @@ -391,11 +460,27 @@ func NewProvisionController(
return client.StorageV1beta1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
}

classHandler := cache.ResourceEventHandlerFuncs{
// We don't need an actual event handler for StorageClasses,
// but we must pass a non-nil one to cache.NewInformer()
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
}

if controller.classInformer != nil {
// no resource event handler needed for StorageClasses
controller.classes, controller.classController =
controller.classInformer.GetStore(),
controller.classInformer.GetController()
} else {
controller.classes, controller.classController = cache.NewInformer(
controller.classSource,
&storagebeta.StorageClass{},
controller.classes,
versionedClassType,
controller.resyncPeriod,
classHandler,
)
}

Expand All @@ -410,7 +495,7 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
ctrl.hasRunLock.Unlock()
go ctrl.claimController.Run(stopCh)
go ctrl.volumeController.Run(stopCh)
go ctrl.classReflector.Run(stopCh)
go ctrl.classController.Run(stopCh)
<-stopCh
}

Expand Down

0 comments on commit 99c887f

Please sign in to comment.