Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

Commit

Permalink
ProvisionController support for shared informers
Browse files Browse the repository at this point in the history
  • Loading branch information
zcahana committed Jan 14, 2018
1 parent fbf3022 commit 6137203
Showing 1 changed file with 120 additions and 56 deletions.
176 changes: 120 additions & 56 deletions lib/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type ProvisionController struct {
volumeSource cache.ListerWatcher
volumeController cache.Controller
classSource cache.ListerWatcher
classReflector *cache.Reflector
classController cache.Controller

volumes cache.Store
claims cache.Store
Expand Down Expand Up @@ -132,6 +132,32 @@ type ProvisionController struct {
hasRunLock *sync.Mutex
}

// ProvisionControllerConfig encapsulates the configurable attributes of a ProvisionController.
type ProvisionControllerConfig struct {
Client kubernetes.Interface

// The name of the provisioner for which this controller dynamically
// provisions volumes.
ProvisionerName string

// The provisioner the controller will use to provision and delete volumes.
Provisioner Provisioner

// Kubernetes cluster server version
KubeVersion string

// A set of functions to apply further configuration to the controller
Options []func(*ProvisionController) error

// Optional shared informers to use for accessing PersistentVolumeClaims, PersistentVolumes, and StorageClasses.
// Each of these may be left nil, in which case a corresponding private (non-shared) informer will be created.
// For StorageClasses, the informer must use the versioned resource appropriate for the Kubernetes cluster version
// (that is, v1.StorageClass for >= 1.6, and v1beta1.StorageClass for < 1.6).
ClaimInformer cache.SharedInformer
VolumeInformer cache.SharedInformer
ClassInformer cache.SharedInformer
}

const (
// DefaultResyncPeriod is used when option function ResyncPeriod is omitted
DefaultResyncPeriod = 15 * time.Second
Expand Down Expand Up @@ -275,33 +301,27 @@ func TermLimit(termLimit time.Duration) func(*ProvisionController) error {
}
}

// NewProvisionController creates a new provision controller
func NewProvisionController(
client kubernetes.Interface,
provisionerName string,
provisioner Provisioner,
kubeVersion string,
options ...func(*ProvisionController) error,
) *ProvisionController {
// NewProvisionControllerWithConfig creates a new provision controller using the given configuration.
func NewProvisionControllerWithConfig(config ProvisionControllerConfig) *ProvisionController {
identity := uuid.NewUUID()
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: config.Client.CoreV1().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
out, err := exec.Command("hostname").Output()
if err != nil {
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s %s", provisionerName, string(identity))})
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s %s", config.ProvisionerName, string(identity))})
} else {
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s %s %s", provisionerName, strings.TrimSpace(string(out)), string(identity))})
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s %s %s", config.ProvisionerName, strings.TrimSpace(string(out)), string(identity))})
}

// TODO: GetReference fails otherwise
v1.AddToScheme(scheme.Scheme)

controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
client: config.Client,
provisionerName: config.ProvisionerName,
provisioner: config.Provisioner,
kubeVersion: utilversion.MustParseSemantic(config.KubeVersion),
identity: identity,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
Expand All @@ -324,84 +344,128 @@ func NewProvisionController(
hasRunLock: &sync.Mutex{},
}

for _, option := range options {
for _, option := range config.Options {
option(controller)
}

controller.claimSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
return config.Client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
return config.Client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
},
}
controller.claims, controller.claimController = cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
},
)
claimEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim,
DeleteFunc: nil,
}
if config.ClaimInformer != nil {
config.ClaimInformer.AddEventHandlerWithResyncPeriod(claimEventHandler, controller.resyncPeriod)
controller.claims, controller.claimController =
config.ClaimInformer.GetStore(),
config.ClaimInformer.GetController()
} else {
controller.claims, controller.claimController = cache.NewInformer(
controller.claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
claimEventHandler,
)
}

controller.volumeSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumes().List(options)
return config.Client.CoreV1().PersistentVolumes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().PersistentVolumes().Watch(options)
return config.Client.CoreV1().PersistentVolumes().Watch(options)
},
}
controller.volumes, controller.volumeController = cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
},
)
volumeEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: controller.updateVolume,
DeleteFunc: nil,
}
if config.VolumeInformer != nil {
config.VolumeInformer.AddEventHandlerWithResyncPeriod(volumeEventHandler, controller.resyncPeriod)
controller.volumes, controller.volumeController =
config.VolumeInformer.GetStore(),
config.VolumeInformer.GetController()
} else {
controller.volumes, controller.volumeController = cache.NewInformer(
controller.volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
volumeEventHandler,
)
}

controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
var versionedClassType runtime.Object
if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
versionedClassType = &storage.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1().StorageClasses().List(options)
return config.Client.StorageV1().StorageClasses().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.StorageV1().StorageClasses().Watch(options)
return config.Client.StorageV1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
controller.classSource,
&storage.StorageClass{},
controller.classes,
controller.resyncPeriod,
)
} else {
versionedClassType = &storagebeta.StorageClass{}
controller.classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1beta1().StorageClasses().List(options)
return config.Client.StorageV1beta1().StorageClasses().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.StorageV1beta1().StorageClasses().Watch(options)
return config.Client.StorageV1beta1().StorageClasses().Watch(options)
},
}
controller.classReflector = cache.NewReflector(
}
classEventHandler := cache.ResourceEventHandlerFuncs{
// We don't need an actual event handler for StorageClasses,
// but we must pass a non-nil one to cache.NewInformer()
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
}
if config.ClassInformer != nil {
// no resource event handler needed for StorageClasses
controller.classes, controller.classController =
config.ClassInformer.GetStore(),
config.ClassInformer.GetController()
} else {
controller.classes, controller.classController = cache.NewInformer(
controller.classSource,
&storagebeta.StorageClass{},
controller.classes,
versionedClassType,
controller.resyncPeriod,
classEventHandler,
)
}

return controller
}

// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
client kubernetes.Interface,
provisionerName string,
provisioner Provisioner,
kubeVersion string,
options ...func(*ProvisionController) error,
) *ProvisionController {
return NewProvisionControllerWithConfig(ProvisionControllerConfig{
Client: client,
ProvisionerName: provisionerName,
Provisioner: provisioner,
KubeVersion: kubeVersion,
Options: options,
})
}

// Run starts all of this controller's control loops
func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
glog.Infof("Starting provisioner controller %s!", string(ctrl.identity))
Expand All @@ -410,7 +474,7 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
ctrl.hasRunLock.Unlock()
go ctrl.claimController.Run(stopCh)
go ctrl.volumeController.Run(stopCh)
go ctrl.classReflector.Run(stopCh)
go ctrl.classController.Run(stopCh)
<-stopCh
}

Expand Down

0 comments on commit 6137203

Please sign in to comment.